5ac0618a33
The 7 code_path_audit*.py files (2604 lines total) are pure static analysis tools. They do AST traversal of src/, no intrusive profiling, no runtime markers. They were inlaid with src/ but only import: - src.result_types (the Result[T] convention type) - each other (the 6 siblings) After the move: - src/ is now pure application code; line-count audit metrics are clean - scripts/code_path_audit/ is a new namespace-isolated subdir per AGENTS.md 'scripts are namespace-isolated by directory' rule TIER-3 READ AGENTS.md + conductor/workflow.md + conductor/edit_workflow.md + conductor/code_styleguides/code_path_audit.md + the 7 files before this commit. Changes: - 7 files moved: src/code_path_audit*.py -> scripts/code_path_audit/ - 7 files updated: internal imports rom src.code_path_audit_X -> rom code_path_audit_X (siblings in same subdir) - 7 files updated: add sys.path.insert(0, str(Path(__file__).resolve().parents[2] / 'src')) to find src.result_types when run standalone - 5 test files updated: rom src.code_path_audit -> rom code_path_audit + sys.path setup to find the new subdir - 6 throwaway scripts in scripts/tier2/artifacts/ updated: import path + sys.path setup (parents[3] / 'src' + parents[3] / 'scripts' / 'code_path_audit') - 2 styleguide/spec references updated: conductor/code_styleguides/code_path_audit.md + conductor/tracks/code_path_audit_20260607/spec_v2.md - 1 meta-audit docstring updated: scripts/audit_code_path_audit_coverage.py - 1 type registry entry deleted: docs/type_registry/src_code_path_audit.md (the type is no longer in src/) - 1 type registry index updated: docs/type_registry/index.md (22 files, was 23) Verification: - 7/7 audit gates pass --strict (weak_types 102<=112, type_registry 22 files, main_thread_imports OK, no_models_config_io OK, code_path_audit_coverage 0 violations, exception_handling 0 violations, optional_in_3_files 0 violations) - 6/6 test files pass: test_code_path_audit, test_code_path_audit_integration, test_code_path_audit_phase78, test_code_path_audit_phase89, test_code_path_audit_ssdl_behavioral, test_metadata_nil_sentinel - src/ line count: 29997 lines (down from 32621 = -2624 lines) - scripts/code_path_audit/ line count: 2620 lines
357 lines
15 KiB
Python
357 lines
15 KiB
Python
"""SSDL analysis for code_path_audit v2.
|
|
|
|
Translates per-aggregate findings into SSDL (Spec/Sketch Description
|
|
Language) sketches + computes "effective codepaths" + suggests
|
|
specific defusing techniques per aggregate.
|
|
|
|
This is the layer that produces real DEDUCTIONS on codebase
|
|
organization: not just "this is a fat struct" but "this branch
|
|
explosion can be defused by introducing a nil sentinel here".
|
|
"""
|
|
from __future__ import annotations
|
|
import sys
|
|
from pathlib import Path
|
|
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
|
|
import ast
|
|
from code_path_audit import (
|
|
AggregateProfile,
|
|
FunctionRef,
|
|
)
|
|
|
|
|
|
SSDL_PRIMITIVES: dict[str, str] = {
|
|
"I": "Instruction (single unit of computation)",
|
|
"T": "Terminator (returns/exits)",
|
|
"B": "Branch (conditional fork)",
|
|
"M": "Merge (control flow reconverges)",
|
|
"Q": "State Query (reads persistent state)",
|
|
"S": "State Mutation (writes persistent state)",
|
|
"N": "Nil Sentinel (defuses branches)",
|
|
}
|
|
|
|
|
|
def _resolve_filepath(fref: FunctionRef, src_dir: str) -> Path | None:
|
|
_p = Path(fref.file)
|
|
filepath = _p if _p.exists() else Path(src_dir) / fref.file
|
|
if not filepath.exists():
|
|
return None
|
|
return filepath
|
|
|
|
|
|
def compute_effective_codepaths(profile: AggregateProfile, src_dir: str = "src") -> int:
|
|
"""Compute the effective codepath count for one aggregate.
|
|
|
|
Effective codepaths = sum over all consumer functions of
|
|
2^(branch_count_in_function).
|
|
|
|
This is the combinatoric explosion metric (Fleury).
|
|
High numbers indicate branch-explosion risk; defusing with
|
|
nil sentinels or immediate-mode caches reduces it to ~1.
|
|
"""
|
|
if profile.is_candidate:
|
|
return 0
|
|
total = 0
|
|
for fref in profile.consumers:
|
|
branches = count_branches_in_function(fref, src_dir)
|
|
total += 2 ** branches
|
|
return total
|
|
|
|
|
|
def count_branches_in_function(fref: FunctionRef, src_dir: str = "src") -> int:
|
|
"""Count the explicit branch points (if/elif/while/try/for/with) in a function."""
|
|
filepath = _resolve_filepath(fref, src_dir)
|
|
if filepath is None:
|
|
return 0
|
|
try:
|
|
source = filepath.read_text(encoding="utf-8")
|
|
tree = ast.parse(source)
|
|
except (OSError, SyntaxError):
|
|
return 0
|
|
func_name = fref.fqname.rsplit(".", 1)[-1]
|
|
for node in ast.walk(tree):
|
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
continue
|
|
if node.name != func_name:
|
|
continue
|
|
count = 0
|
|
for sub in ast.walk(node):
|
|
if isinstance(sub, (ast.If, ast.For, ast.While, ast.With, ast.Try, ast.ExceptHandler)):
|
|
count += 1
|
|
elif isinstance(sub, ast.BoolOp):
|
|
count += len(sub.values) - 1
|
|
return count
|
|
return 0
|
|
|
|
|
|
def detect_nil_check_pattern(fref: FunctionRef, src_dir: str = "src") -> bool:
|
|
"""Detect if the function uses `is None` / `== None` / `!= None` checks.
|
|
|
|
A nil check is a branch that a nil sentinel could defuse.
|
|
"""
|
|
filepath = _resolve_filepath(fref, src_dir)
|
|
if filepath is None:
|
|
return False
|
|
try:
|
|
source = filepath.read_text(encoding="utf-8")
|
|
tree = ast.parse(source)
|
|
except (OSError, SyntaxError):
|
|
return False
|
|
func_name = fref.fqname.rsplit(".", 1)[-1]
|
|
for node in ast.walk(tree):
|
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
continue
|
|
if node.name != func_name:
|
|
continue
|
|
for sub in ast.walk(node):
|
|
if not isinstance(sub, ast.Compare):
|
|
continue
|
|
for comparator in sub.comparators:
|
|
if isinstance(comparator, ast.Constant) and comparator.value is None:
|
|
return True
|
|
return False
|
|
return False
|
|
|
|
|
|
def compute_field_access_efficiency(profile: AggregateProfile) -> float:
|
|
"""Compute field-access efficiency: ratio of typed accesses to total accesses.
|
|
|
|
High efficiency (>0.7) means consumers are using the typed fields directly.
|
|
Low efficiency (<0.3) means consumers are using wildcards or the aggregate
|
|
is being passed through without field use (candidate for immediate-mode).
|
|
"""
|
|
if profile.is_candidate:
|
|
return 1.0
|
|
tac = profile.type_alias_coverage
|
|
if tac.total_sites == 0:
|
|
return 0.0
|
|
return tac.typed_sites / tac.total_sites
|
|
|
|
|
|
def suggest_defusing_technique(profile: AggregateProfile, src_dir: str = "src") -> list[dict]:
|
|
"""Suggest specific SSDL defusing techniques for this aggregate.
|
|
|
|
Returns a list of {technique, location, current_state, recommended_change,
|
|
effective_codepaths_before, effective_codepaths_after}.
|
|
"""
|
|
suggestions: list[dict] = []
|
|
if profile.is_candidate:
|
|
return suggestions
|
|
nil_check_count = sum(1 for f in profile.consumers if detect_nil_check_pattern(f, src_dir))
|
|
effective = compute_effective_codepaths(profile, src_dir)
|
|
efficiency = compute_field_access_efficiency(profile)
|
|
branch_count = sum(count_branches_in_function(f, src_dir) for f in profile.consumers)
|
|
|
|
if nil_check_count > 0:
|
|
suggestions.append({
|
|
"technique": "Nil Sentinel `[N]`",
|
|
"location": f"{nil_check_count} consumer function{'s' if nil_check_count != 1 else ''} have `is None` / `== None` checks",
|
|
"current_state": f"{nil_check_count} nil-check branches contribute to branch explosion",
|
|
"recommended_change": "Introduce a module-level `NIL_<AGGREGATE>` sentinel whose field accesses return safe defaults. Replace None checks with the sentinel. Collapses 2^branch_count into ~1.",
|
|
"effective_codepaths_before": effective,
|
|
"effective_codepaths_after": max(1, effective - nil_check_count * 2),
|
|
})
|
|
|
|
if efficiency < 0.3:
|
|
suggestions.append({
|
|
"technique": "Immediate-Mode Cache `[Q:key] -> [I:FetchCached] -> [T]`",
|
|
"location": f"{profile.name} consumers access {profile.type_alias_coverage.total_sites} sites, only {profile.type_alias_coverage.typed_sites} typed ({efficiency*100:.0f}%)",
|
|
"current_state": "Many consumers use wildcard or defensive access patterns",
|
|
"recommended_change": f"Introduce a `{profile.name.lower()}_cache` keyed lookup. Consumers request by key, get cached value, no field-existence checks. Reduces {profile.type_alias_coverage.total_sites} field-check branches to 1 cache lookup.",
|
|
"effective_codepaths_before": effective,
|
|
"effective_codepaths_after": max(1, profile.type_alias_coverage.total_sites),
|
|
})
|
|
|
|
if branch_count > 20:
|
|
suggestions.append({
|
|
"technique": "Generational Handles `[I:ResolveHandle] -> [B:Gen matches?] -> [N|safe]`",
|
|
"location": f"{profile.name} consumers have {branch_count} explicit branch points total",
|
|
"current_state": f"Branch explosion: {branch_count} branches = {effective} effective codepaths",
|
|
"recommended_change": "Wrap the aggregate in a generational handle (index + generation). Validation is one comparison; mismatch returns the nil sentinel. Reduces N lifetime branches to 1 handle validation + sentinel return.",
|
|
"effective_codepaths_before": effective,
|
|
"effective_codepaths_after": len(profile.consumers),
|
|
})
|
|
|
|
return suggestions
|
|
|
|
|
|
def render_ssdl_sketch(profile: AggregateProfile, src_dir: str = "src") -> str:
|
|
"""Render an SSDL sketch of one aggregate's access pattern.
|
|
|
|
The sketch shows:
|
|
- Producers (queries that fetch the aggregate)
|
|
- Consumers (instruction sequences that read the aggregate)
|
|
- Branch points (B)
|
|
- Defusing opportunities (N)
|
|
- Effective codepaths metric
|
|
"""
|
|
if profile.is_candidate:
|
|
return f"## SSDL Sketch for {profile.name}\n\n_(placeholder; candidate aggregate)_\n"
|
|
lines: list[str] = [f"## SSDL Sketch for `{profile.name}`", ""]
|
|
lines.append("```")
|
|
lines.append(f"[Q:{profile.name} entry-point] -> [Q:PCG lookup]")
|
|
nil_check_funcs = [f for f in profile.consumers if detect_nil_check_pattern(f, src_dir)]
|
|
branches_total = 0
|
|
for i, fref in enumerate(profile.consumers):
|
|
b = count_branches_in_function(fref, src_dir)
|
|
branches_total += b
|
|
is_nil = fref in nil_check_funcs
|
|
nil_marker = "[B:is None?]" if is_nil else "[B:check]"
|
|
nil_defuse = "[N:safe]" if is_nil else ""
|
|
short_name = fref.fqname.rsplit(".", 1)[-1]
|
|
lines.append(f" -> [{i+1}: {short_name}] {nil_marker} (branches={b}) {nil_defuse}")
|
|
lines.append(" -> [T:done]")
|
|
lines.append("```")
|
|
lines.append("")
|
|
effective = compute_effective_codepaths(profile, src_dir)
|
|
lines.append(f"**Effective codepaths:** {effective} (sum of 2^branches across {len(profile.consumers)} consumers)")
|
|
lines.append(f"**Total branch points:** {branches_total}")
|
|
lines.append(f"**Nil-check functions:** {len(nil_check_funcs)}")
|
|
lines.append("")
|
|
suggestions = suggest_defusing_technique(profile, src_dir)
|
|
if suggestions:
|
|
lines.append("**Defusing opportunities:**")
|
|
lines.append("")
|
|
for s in suggestions:
|
|
lines.append(f"- **{s['technique']}**: {s['recommended_change']}")
|
|
lines.append(f" - Effective codepaths: {s['effective_codepaths_before']} -> {s['effective_codepaths_after']}")
|
|
else:
|
|
lines.append("**No SSDL defusing opportunities detected** (the aggregate is already well-structured for data-oriented access).")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_ssdl_rollup(profiles: tuple[AggregateProfile, ...], src_dir: str = "src") -> str:
|
|
"""Render the SSDL rollup (all aggregates + their defusing opportunities)."""
|
|
lines: list[str] = ["# SSDL Analysis Rollup", ""]
|
|
lines.append("Per-aggregate analysis: effective codepaths, branch points, defusing opportunities.")
|
|
lines.append("")
|
|
real_profiles = [p for p in profiles if not p.is_candidate]
|
|
lines.append("## Effective codepaths ranking")
|
|
lines.append("")
|
|
lines.append("| Aggregate | Consumers | Total branches | Effective codepaths | Field efficiency |")
|
|
lines.append("|---|---|---|---|---|")
|
|
ranked = sorted(real_profiles, key=lambda p: -compute_effective_codepaths(p, src_dir))
|
|
for p in ranked:
|
|
ec = compute_effective_codepaths(p, src_dir)
|
|
tc = sum(count_branches_in_function(f, src_dir) for f in p.consumers)
|
|
eff = compute_field_access_efficiency(p) * 100
|
|
lines.append(f"| `{p.name}` | {len(p.consumers)} | {tc} | {ec} | {eff:.0f}% |")
|
|
lines.append("")
|
|
lines.append("## Defusing recommendations (top 10)")
|
|
lines.append("")
|
|
all_suggestions: list[tuple[AggregateProfile, dict]] = []
|
|
for p in real_profiles:
|
|
for s in suggest_defusing_technique(p, src_dir):
|
|
all_suggestions.append((p, s))
|
|
all_suggestions.sort(key=lambda ps: -(ps[1]['effective_codepaths_before'] - ps[1]['effective_codepaths_after']))
|
|
if not all_suggestions:
|
|
lines.append("_(no defusing recommendations detected)_\n")
|
|
return "\n".join(lines)
|
|
for p, s in all_suggestions[:10]:
|
|
lines.append(f"### `{p.name}` - {s['technique']}")
|
|
lines.append("")
|
|
lines.append(f"- **Location:** {s['location']}")
|
|
lines.append(f"- **Current state:** {s['current_state']}")
|
|
lines.append(f"- **Recommended change:** {s['recommended_change']}")
|
|
lines.append(f"- **Effective codepaths:** {s['effective_codepaths_before']} -> {s['effective_codepaths_after']}")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_organization_deductions(profiles: tuple[AggregateProfile, ...], src_dir: str = "src") -> str:
|
|
"""Render the organization deductions rollup.
|
|
|
|
Cross-aggregate view of codebase organization. Based on SSDL principles:
|
|
- Well-organized: few branches, high field efficiency, few effective codepaths
|
|
- Needs restructuring: many branches, low efficiency, branch-explosion risk
|
|
"""
|
|
lines: list[str] = ["# Organization Deductions", ""]
|
|
lines.append("Cross-aggregate view of codebase organization. Verdicts derived from SSDL analysis:")
|
|
lines.append("- **well-organized**: <=50 effective codepaths AND >=50% field efficiency")
|
|
lines.append("- **moderate**: between the two thresholds")
|
|
lines.append("- **needs restructuring**: >200 effective codepaths OR <20% field efficiency")
|
|
lines.append("")
|
|
real_profiles = [p for p in profiles if not p.is_candidate]
|
|
|
|
lines.append("## Module organization observations")
|
|
lines.append("")
|
|
lines.append("### Files with most cross-aggregate involvement")
|
|
lines.append("")
|
|
file_agg: dict[str, set[str]] = {}
|
|
file_consumers: dict[str, set[str]] = {}
|
|
for p in real_profiles:
|
|
for f in p.producers:
|
|
file_agg.setdefault(f.file, set()).add(p.name)
|
|
for f in p.consumers:
|
|
file_consumers.setdefault(f.file, set()).add(p.name)
|
|
rows: list[tuple[str, int, int]] = []
|
|
for f in sorted(file_agg.keys()):
|
|
rows.append((f, len(file_agg[f]), len(file_consumers.get(f, set()))))
|
|
rows.sort(key=lambda r: -(r[1] + r[2]))
|
|
lines.append("| file | aggregates produced | aggregates consumed |")
|
|
lines.append("|---|---|---|")
|
|
for f, pc, cc in rows[:15]:
|
|
lines.append(f"| `{f}` | {pc} | {cc} |")
|
|
lines.append("")
|
|
lines.append("### Files with high coupling (producers + consumers >= 8)")
|
|
lines.append("")
|
|
lines.append("These files are the central nervous system of the codebase. Changes ripple across the most aggregates.")
|
|
lines.append("")
|
|
lines.append("| file | coupling score (producers + consumers) |")
|
|
lines.append("|---|---|")
|
|
high_coupling = [(f, pc, cc) for f, pc, cc in rows if (pc + cc) >= 8]
|
|
for f, pc, cc in high_coupling:
|
|
lines.append(f"| `{f}` | {pc + cc} (high) |")
|
|
lines.append("")
|
|
|
|
lines.append("## Per-aggregate organization verdict")
|
|
lines.append("")
|
|
lines.append("| Aggregate | Verdict | Notes |")
|
|
lines.append("|---|---|---|")
|
|
verdict_counts = {"well-organized": 0, "moderate": 0, "needs restructuring": 0}
|
|
for p in real_profiles:
|
|
ec = compute_effective_codepaths(p, src_dir)
|
|
eff = compute_field_access_efficiency(p) * 100
|
|
nil_count = sum(1 for f in p.consumers if detect_nil_check_pattern(f, src_dir))
|
|
if ec <= 50 and eff >= 50:
|
|
verdict = "well-organized"
|
|
elif ec > 200 or eff < 20:
|
|
verdict = "needs restructuring"
|
|
else:
|
|
verdict = "moderate"
|
|
verdict_counts[verdict] += 1
|
|
notes: list[str] = []
|
|
if nil_count > 0:
|
|
notes.append(f"{nil_count} nil checks")
|
|
if eff < 50:
|
|
notes.append(f"{eff:.0f}% field efficiency")
|
|
if ec > 100:
|
|
notes.append(f"{ec} effective codepaths")
|
|
note_str = "; ".join(notes) if notes else "no major issues"
|
|
lines.append(f"| `{p.name}` | {verdict} | {note_str} |")
|
|
lines.append("")
|
|
lines.append(f"**Tally:** {verdict_counts['well-organized']} well-organized, {verdict_counts['moderate']} moderate, {verdict_counts['needs restructuring']} needs restructuring")
|
|
lines.append("")
|
|
|
|
lines.append("## Restructuring routes (prioritized)")
|
|
lines.append("")
|
|
priority_routes = []
|
|
for p in real_profiles:
|
|
ec = compute_effective_codepaths(p, src_dir)
|
|
eff = compute_field_access_efficiency(p)
|
|
if ec > 100 or eff < 0.3:
|
|
priority_routes.append((p, ec, eff))
|
|
priority_routes.sort(key=lambda r: -r[1])
|
|
if priority_routes:
|
|
lines.append("Top restructuring routes (by effective codepath count):")
|
|
lines.append("")
|
|
for i, (p, ec, eff) in enumerate(priority_routes[:5], 1):
|
|
nil_count = sum(1 for f in p.consumers if detect_nil_check_pattern(f, src_dir))
|
|
lines.append(f"{i}. **`{p.name}`**: {ec} effective codepaths ({eff*100:.0f}% field efficiency)")
|
|
lines.append(f" - Apply nil sentinel to {nil_count} nil-check functions")
|
|
lines.append(f" - Migrate to immediate-mode cache for {p.type_alias_coverage.total_sites} field-access sites")
|
|
else:
|
|
lines.append("_(no high-priority restructuring routes; all aggregates have moderate effective codepath counts)_")
|
|
lines.append("")
|
|
return "\n".join(lines)
|