From 09167986d5191d412a2ee1b53f9fef34941de06f Mon Sep 17 00:00:00 2001 From: Ed_ Date: Mon, 22 Jun 2026 10:46:34 -0400 Subject: [PATCH] wip: SSDL analysis (has indentation bug, needs fix) --- .../code_path_audit_20260607/state.toml | 2 +- .../code_path_audit_20260607/_fix_ssdl.py | 35 ++ src/code_path_audit_ssdl.py | 315 ++++++++++++++++++ 3 files changed, 351 insertions(+), 1 deletion(-) create mode 100644 scripts/tier2/artifacts/code_path_audit_20260607/_fix_ssdl.py create mode 100644 src/code_path_audit_ssdl.py diff --git a/conductor/tracks/code_path_audit_20260607/state.toml b/conductor/tracks/code_path_audit_20260607/state.toml index df1f1532..913b2498 100644 --- a/conductor/tracks/code_path_audit_20260607/state.toml +++ b/conductor/tracks/code_path_audit_20260607/state.toml @@ -38,7 +38,7 @@ phase_8 = { status = "completed", checkpointsha = "c8253847", name = "v2 DSL (14 phase_9 = { status = "completed", checkpointsha = "c8253847", name = "run_audit() main entry + CLI + MCP tool" } phase_10 = { status = "completed", checkpointsha = "0690dcef", name = "Integration tests (synthetic src/ + audit_inputs/ fixtures)" } phase_11 = { status = "completed", checkpointsha = "0690dcef", name = "Live_gui E2E tests (opt-in via CODE_PATH_AUDIT_LIVE_GUI=1) - file created, 2 tests gated on env var" } -phase_12 = { status = "completed", checkpointsha = "f5f31318", name = "Meta-audit + styleguide (Task 12.2 skipped - audit_optional_in_3_files.py missing on master)" } +phase_12 = { status = "completed", checkpointsha = "db36495f", name = "Meta-audit + styleguide + audit_optional_in_3_files.py (CREATED from scratch, was missing on master)" } phase_13 = { status = "completed", checkpointsha = "d46a71f7", name = "End-of-track report (commit f93421f8) + tracks.md update (commit d46a71f7)" } [verification] diff --git a/scripts/tier2/artifacts/code_path_audit_20260607/_fix_ssdl.py b/scripts/tier2/artifacts/code_path_audit_20260607/_fix_ssdl.py new file mode 100644 index 00000000..7de64561 --- /dev/null +++ b/scripts/tier2/artifacts/code_path_audit_20260607/_fix_ssdl.py @@ -0,0 +1,35 @@ +"""Fix indentation in ssdl.py file.""" +import re + +filepath = r'C:\projects\manual_slop_tier2\src\code_path_audit_ssdl.py' + +with open(filepath, 'r') as f: + lines = f.readlines() + +# Walk and fix indentation based on Python's standard rules +new_lines = [] +indent_stack = [0] +for line in lines: + stripped = line.lstrip() + if not stripped or stripped.startswith('#'): + new_lines.append(line) + continue + cur_indent = len(line) - len(stripped) + # If line ends with ':', expected next indent is cur_indent + 2 + # If line is at less indent than top of stack, pop + while indent_stack and indent_stack[-1] > cur_indent: + indent_stack.pop() + # Validate the line matches one of the stack + if not indent_stack or indent_stack[-1] != cur_indent: + # Try to recover by setting the indent to top of stack + new_line = ' ' * (indent_stack[-1] if indent_stack else 0) + stripped + else: + new_line = line + new_lines.append(new_line if 'new_line' in dir() else line) + if stripped.rstrip().endswith(':') and not stripped.startswith(' '): + indent_stack.append(cur_indent + 2) + +with open(filepath, 'w') as f: + f.writelines(new_lines) + +print('done') \ No newline at end of file diff --git a/src/code_path_audit_ssdl.py b/src/code_path_audit_ssdl.py new file mode 100644 index 00000000..0a009484 --- /dev/null +++ b/src/code_path_audit_ssdl.py @@ -0,0 +1,315 @@ +"""SSDL analysis for code_path_audit v2. + +Translates per-aggregate findings into SSDL (Spec/Sketch Description +Language) sketches + computes "effective codepaths" + suggests +specific defusing techniques per aggregate. + +This is the layer that produces real DEDUCTIONS on codebase +organization: not just "this is a fat struct" but "this branch +explosion can be defused by introducing a nil sentinel here". +""" +from __future__ import annotations +import ast +from collections import Counter +from pathlib import Path +from src.code_path_audit import ( +AggregateProfile, +FunctionRef, +AccessPatternEvidence, +) + + +SSDL_PRIMITIVES: dict[str, str] = { +"I": "Instruction (single unit of computation)", +"T": "Terminator (returns/exits)", +"B": "Branch (conditional fork)", +"M": "Merge (control flow reconverges)", +"Q": "State Query (reads persistent state)", +"S": "State Mutation (writes persistent state)", +"N": "Nil Sentinel (defuses branches)", +} + + +def compute_effective_codepaths(profile: AggregateProfile, src_dir: str = "src") -> int: +"""Compute the effective codepath count for one aggregate. + +Effective codepaths = sum over all consumer functions of +2^(branch_count_in_function). + +This is the combinatoric explosion metric (Fleury). +High numbers indicate branch-explosion risk; defusing with +nil sentinels or immediate-mode caches reduces it to ~1. +""" +if profile.is_candidate: +return 0 +total = 0 +for fref in profile.consumers: +branches = count_branches_in_function(fref, src_dir) +effective = 2 ** branches +total += effective +return total + + +def count_branches_in_function(fref: FunctionRef, src_dir: str = "src") -> int: +"""Count the explicit branch points (if/elif/while/try/for/with) in a function.""" +_p = Path(fref.file) +filepath = _p if _p.exists() else Path(src_dir) / fref.file +if not filepath.exists(): +return 0 +try: +source = filepath.read_text(encoding="utf-8") +tree = ast.parse(source) +except (OSError, SyntaxError): +return 0 +for node in ast.walk(tree): +if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == fref.fqname.rsplit(".", 1)[-1]: +count = 0 +for sub in ast.walk(node): +if isinstance(sub, (ast.If, ast.For, ast.While, ast.With, ast.Try, ast.ExceptHandler)): +count += 1 +elif isinstance(sub, ast.BoolOp): +count += len(sub.values) - 1 +return count +return 0 + + +def detect_nil_check_pattern(fref: FunctionRef, src_dir: str = "src") -> bool: +"""Detect if the function uses `is None` / `== None` / `!= None` checks (a branch that nil-sentinel could defuse).""" +_p = Path(fref.file) +filepath = _p if _p.exists() else Path(src_dir) / fref.file +if not filepath.exists(): +return False +try: +source = filepath.read_text(encoding="utf-8") +tree = ast.parse(source) +except (OSError, SyntaxError): +return False +for node in ast.walk(tree): +if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == fref.fqname.rsplit(".", 1)[-1]: +for sub in ast.walk(node): +if isinstance(sub, ast.Compare): +for comparator in sub.comparators: +if isinstance(comparator, ast.Constant) and comparator.value is None: +return True +return False +return False + + +def compute_field_access_efficiency(profile: AggregateProfile) -> float: +"""Compute field-access efficiency: ratio of typed accesses to total accesses. + +High efficiency (>0.7) means consumers are using the typed fields directly. +Low efficiency (<0.3) means consumers are using wildcards or the aggregate +is being passed through without field use (candidate for immediate-mode). +""" +if profile.is_candidate: +return 1.0 +tac = profile.type_alias_coverage +if tac.total_sites == 0: +return 0.0 +return tac.typed_sites / tac.total_sites + + +def suggest_defusing_technique(profile: AggregateProfile, src_dir: str = "src") -> list[dict]: +"""Suggest specific SSDL defusing techniques for this aggregate. + +Returns a list of {technique, location, current_state, recommended_change}. +""" +suggestions: list[dict] = [] +if profile.is_candidate: +return suggestions +nil_check_count = sum(1 for f in profile.consumers if detect_nil_check_pattern(f, src_dir)) +effective = compute_effective_codepaths(profile, src_dir) +if nil_check_count > 0: +suggestions.append({ +"technique": "Nil Sentinel `[N]`", +"location": f"{nil_check_count} consumer function{'s' if nil_check_count != 1 else ''} have `is None` / `== None` checks", +"current_state": f"Branch-defusing opportunity: {nil_check_count} functions with explicit None checks", +"recommended_change": "Introduce a module-level `NIL_` sentinel whose field accesses return safe defaults. Replace None checks with the sentinel. Reduces effective codepaths from 2^branch_count to 1.", +"effective_codepaths_before": effective, +"effective_codepaths_after": max(1, effective - nil_check_count), +}) +efficiency = compute_field_access_efficiency(profile) +if efficiency < 0.3: +suggestions.append({ +"technique": "Immediate-Mode Cache `[Q:key] -> [I:FetchCached] -> [T]`", +"location": f"{profile.name} consumers access {profile.type_alias_coverage.total_sites} sites, only {profile.type_alias_coverage.typed_sites} typed ({efficiency*100:.0f}%)", +"current_state": "Many consumers use wildcard access (high effective codepaths via defensive checks)", +"recommended_change": f"Introduce a `{profile.name.lower()}_cache` keyed lookup. Consumers request by key, get cached value, no field-existence checks. Reduces {profile.type_alias_coverage.total_sites} field-check branches to 1 cache lookup.", +"effective_codepaths_before": effective, +"effective_codepaths_after": max(1, profile.type_alias_coverage.total_sites), +}) +branch_count = sum(count_branches_in_function(f, src_dir) for f in profile.consumers) +if branch_count > 20: +suggestions.append({ +"technique": "Generational Handles `[I:ResolveHandle] -> [B:Gen matches?] -> [N|safe]`", +"location": f"{profile.name} consumers have {branch_count} explicit branch points total", +"current_state": f"Branch explosion: {branch_count} branches = {effective} effective codepaths", +"recommended_change": "Wrap the aggregate in a generational handle (index + generation). Validation is one comparison; mismatch returns the nil sentinel. Reduces N lifetime branches to 1 handle validation + sentinel return.", +"effective_codepaths_before": effective, +"effective_codepaths_after": len(profile.consumers), +}) +return suggestions + + +def render_ssdl_sketch(profile: AggregateProfile, src_dir: str = "src") -> str: +"""Render an SSDL sketch of one aggregate's access pattern. + +The sketch shows: +- Producers (queries that fetch the aggregate) +- Consumers (instruction sequences that read the aggregate) +- Branch points (B) +- Defusing opportunities (N) +- Effective codepaths metric +""" +if profile.is_candidate: +return f"## SSDL Sketch for {profile.name}\n\n_(placeholder; candidate aggregate, would be detected after any_type_componentization_20260621)_\n" +lines: list[str] = [f"## SSDL Sketch for `{profile.name}`", ""] +lines.append("```") +lines.append(f"[Q:{profile.name} entry-point] -> [Q:PCG lookup]") +nil_check_funcs = [f for f in profile.consumers if detect_nil_check_pattern(f, src_dir)] +branches_total = 0 +for i, fref in enumerate(profile.consumers): +b = count_branches_in_function(fref, src_dir) +branches_total += b +is_nil = fref in nil_check_funcs +nil_marker = "[B:is None?]" if is_nil else "[B:check]" +nil_defuse = "[N:safe]" if is_nil else "" +lines.append(f" -> [{i+1}: {fref.fqname.rsplit('.', 1)[-1]}] {nil_marker} (branches={b}) {nil_defuse}") +lines.append(" -> [T:done]") +lines.append("```") +lines.append("") +effective = compute_effective_codepaths(profile, src_dir) +lines.append(f"**Effective codepaths:** {effective} (sum of 2^branches across {len(profile.consumers)} consumers)") +lines.append(f"**Total branch points:** {branches_total}") +lines.append(f"**Nil-check functions:** {len(nil_check_funcs)}") +lines.append("") +suggestions = suggest_defusing_technique(profile, src_dir) +if suggestions: +lines.append("**Defusing opportunities:**") +lines.append("") +for s in suggestions: +lines.append(f"- **{s['technique']}**: {s['recommended_change']}") +lines.append(f" - Effective codepaths: {s['effective_codepaths_before']} -> {s['effective_codepaths_after']}") +else: +lines.append("**No SSDL defusing opportunities detected** (the aggregate is already well-structured for data-oriented access).") +lines.append("") +return "\n".join(lines) + + +def render_ssdl_rollup(profiles: tuple[AggregateProfile, ...], src_dir: str = "src") -> str: +"""Render the SSDL rollup (all aggregates + their defusing opportunities).""" +lines: list[str] = ["# SSDL Analysis Rollup", ""] +lines.append("Per-aggregate analysis: effective codepaths, branch points, defusing opportunities.") +lines.append("") +real_profiles = [p for p in profiles if not p.is_candidate] +lines.append("## Effective codepaths ranking") +lines.append("") +lines.append("| Aggregate | Consumers | Total branches | Effective codepaths | Field efficiency |") +lines.append("|---|---|---|---|---|") +ranked = sorted(real_profiles, key=lambda p: -compute_effective_codepaths(p, src_dir)) +for p in ranked: +ec = compute_effective_codepaths(p, src_dir) +tc = sum(count_branches_in_function(f, src_dir) for f in p.consumers) +eff = compute_field_access_efficiency(p) * 100 +lines.append(f"| `{p.name}` | {len(p.consumers)} | {tc} | {ec} | {eff:.0f}% |") +lines.append("") +lines.append("## Defusing recommendations (top 10)") +lines.append("") +all_suggestions: list[tuple[AggregateProfile, dict]] = [] +for p in real_profiles: +for s in suggest_defusing_technique(p, src_dir): +all_suggestions.append((p, s)) +all_suggestions.sort(key=lambda ps: -(ps[1]['effective_codepaths_before'] - ps[1]['effective_codepaths_after'])) +for p, s in all_suggestions[:10]: +lines.append(f"### `{p.name}` - {s['technique']}") +lines.append("") +lines.append(f"- **Location:** {s['location']}") +lines.append(f"- **Current state:** {s['current_state']}") +lines.append(f"- **Recommended change:** {s['recommended_change']}") +lines.append(f"- **Effective codepaths:** {s['effective_codepaths_before']} -> {s['effective_codepaths_after']}") +lines.append("") +return "\n".join(lines) + + +def render_organization_deductions(profiles: tuple[AggregateProfile, ...], src_dir: str = "src") -> str: +"""Render the organization deductions rollup (where the codebase is well-organized vs needs restructuring).""" +lines: list[str] = ["# Organization Deductions", ""] +lines.append("Cross-aggregate view of codebase organization. Based on SSDL principles:") +lines.append("- Well-organized: few branches, high field efficiency, few effective codepaths") +lines.append("- Needs restructuring: many branches, low efficiency, branch-explosion risk") +lines.append("") +real_profiles = [p for p in profiles if not p.is_candidate] +lines.append("## Module organization observations") +lines.append("") +lines.append("### Files with most cross-aggregate involvement") +lines.append("") +file_agg: dict[str, set[str]] = {} +file_consumers: dict[str, set[str]] = {} +for p in real_profiles: +for f in p.producers: +file_agg.setdefault(f.file, set()).add(p.name) +for f in p.consumers: +file_consumers.setdefault(f.file, set()).add(p.name) +rows = [] +for f in sorted(file_agg.keys()): +rows.append((f, len(file_agg[f]), len(file_consumers.get(f, set())))) +rows.sort(key=lambda r: -(r[1] + r[2])) +lines.append("| file | aggregates produced | aggregates consumed |") +lines.append("|---|---|---|") +for f, pc, cc in rows[:15]: +lines.append(f"| `{f}` | {pc} | {cc} |") +lines.append("") +lines.append("### Files with high coupling (both many producers AND many consumers)") +lines.append("") +lines.append("These files are the central nervous system of the codebase. Changes here ripple across the most aggregates.") +lines.append("") +lines.append("| file | coupling score (producers + consumers) |") +lines.append("|---|---|") +for f, pc, cc in rows[:10]: +score = pc + cc +if score >= 8: +lines.append(f"| `{f}` | {score} (high) |") +lines.append("") +lines.append("## Per-aggregate organization verdict") +lines.append("") +lines.append("| Aggregate | Verdict |") +lines.append("|---|---|") +for p in real_profiles: +ec = compute_effective_codepaths(p, src_dir) +eff = compute_field_access_efficiency(p) * 100 +nil_count = sum(1 for f in p.consumers if detect_nil_check_pattern(f, src_dir)) +if ec <= 50 and eff >= 50: +verdict = "well-organized" +elif ec > 200 or eff < 20: +verdict = "needs restructuring" +else: +verdict = "moderate" +notes = [] +if nil_count > 0: +notes.append(f"{nil_count} nil checks") +if eff < 50: +notes.append(f"{eff:.0f}% field efficiency") +note_str = "; ".join(notes) if notes else "no major issues" +lines.append(f"| `{p.name}` | {verdict} ({note_str}) |") +lines.append("") +lines.append("## Restructuring routes (prioritized)") +lines.append("") +priority_routes = [] +for p in real_profiles: +ec = compute_effective_codepaths(p, src_dir) +eff = compute_field_access_efficiency(p) +if ec > 100 or eff < 0.3: +priority_routes.append((p, ec, eff)) +priority_routes.sort(key=lambda r: -r[1]) +if priority_routes: +lines.append("Top restructuring routes (by effective codepath count):") +lines.append("") +for p, ec, eff in priority_routes[:5]: +lines.append(f"1. **`{p.name}`**: {ec} effective codepaths ({eff*100:.0f}% field efficiency)") +lines.append(f" - Apply nil sentinel to {sum(1 for f in p.consumers if detect_nil_check_pattern(f, src_dir))} nil-check functions") +lines.append(f" - Migrate to immediate-mode cache for the {p.type_alias_coverage.total_sites} field-access sites") +else: +lines.append("_(no high-priority restructuring routes; all aggregates have moderate effective codepath counts)_") +lines.append("") +return "\n".join(lines) \ No newline at end of file