Private
Public Access
0
0
Files
manual_slop/src/code_path_audit.py
T

1245 lines
44 KiB
Python

"""Code path & data pipeline audit tool v2.
Builds a Producer-Consumer Graph (PCG) of src/, classifies each
data aggregate by MemoryDim, detects the access pattern (APD),
estimates the call frequency (CFE), computes the decomposition
cost (componentize vs unify), cross-validates with 6 existing
audit scripts, and emits per-aggregate profiles in the v2 custom
postfix DSL + markdown + prefix tree text. See
conductor/tracks/code_path_audit_20260607/spec_v2.md.
"""
from __future__ import annotations
import ast
import tomllib
from collections import Counter
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
from src.result_types import Result, ErrorInfo, ErrorKind
AggregateKind = Literal[
"typealias",
"dataclass",
"candidate_dataclass",
"builtin",
]
MemoryDim = Literal[
"curation",
"discussion",
"rag",
"knowledge",
"config",
"control",
"unknown",
]
AccessPattern = Literal[
"whole_struct",
"field_by_field",
"hot_cold_split",
"bulk_batched",
"mixed",
]
Frequency = Literal[
"hot",
"per_turn",
"per_discussion",
"per_request",
"cold",
"init",
"unknown",
]
RecommendedDirection = Literal[
"componentize",
"unify",
"hold",
"insufficient_data",
]
@dataclass(frozen=True)
class FunctionRef:
fqname: str
file: str
line: int
role: str
@dataclass(frozen=True)
class AccessPatternEvidence:
function: FunctionRef
pattern: AccessPattern
field_accesses: dict[str, int]
confidence: str
@dataclass(frozen=True)
class FrequencyEvidence:
function: FunctionRef
frequency: Frequency
source: str
note: str = ""
@dataclass(frozen=True)
class ResultCoverage:
total_producers: int
result_producers: int
total_consumers: int
result_consumers: int
summary: str
@dataclass(frozen=True)
class TypeAliasCoverage:
total_sites: int
typed_sites: int
untyped_sites: int
summary: str
@dataclass(frozen=True)
class CrossAuditFinding:
audit_script: str
site_count: int
example_file: str
example_line: int
note: str = ""
@dataclass(frozen=True)
class CrossAuditFindings:
weak_types: tuple[CrossAuditFinding, ...]
exception_handling: tuple[CrossAuditFinding, ...]
optional_in_baseline: tuple[CrossAuditFinding, ...]
config_io_ownership: tuple[CrossAuditFinding, ...]
import_graph: tuple[CrossAuditFinding, ...]
@dataclass(frozen=True)
class DecompositionCost:
current_cost_estimate: int
componentize_savings: int
unify_savings: int
recommended_direction: RecommendedDirection
recommended_rationale: str
batch_size: int | None
struct_field_count: int
struct_frozen: bool
@dataclass(frozen=True)
class OptimizationCandidate:
candidate: str
direction: RecommendedDirection
affected_files: tuple[str, ...]
estimated_savings_us: int
effort: str
priority: str
cross_ref: str = ""
@dataclass(frozen=True)
class AggregateProfile:
name: str
aggregate_kind: AggregateKind
memory_dim: MemoryDim
producers: tuple[FunctionRef, ...]
consumers: tuple[FunctionRef, ...]
access_pattern: AccessPattern
access_pattern_evidence: tuple[AccessPatternEvidence, ...]
frequency: Frequency
frequency_evidence: tuple[FrequencyEvidence, ...]
result_coverage: ResultCoverage
type_alias_coverage: TypeAliasCoverage
cross_audit_findings: CrossAuditFindings
decomposition_cost: DecompositionCost
optimization_candidates: tuple[OptimizationCandidate, ...]
is_candidate: bool
mermaid: str = ""
markdown: str = ""
@dataclass
class ProducerConsumerGraph:
"""Bipartite graph: aggregates <-> functions."""
edges: dict[tuple[str, str], set[str]] = field(default_factory=dict)
producers: dict[str, set[FunctionRef]] = field(default_factory=dict)
consumers: dict[str, set[FunctionRef]] = field(default_factory=dict)
def add_producer(self, aggregate: str, function: FunctionRef) -> None:
self.producers.setdefault(aggregate, set()).add(function)
def add_consumer(self, aggregate: str, function: FunctionRef) -> None:
self.consumers.setdefault(aggregate, set()).add(function)
def P1_pass(tree: ast.Module, file: str) -> list[tuple[str, str, str, str]]:
"""AST pass 1: detect producers of T and Result[T] via return annotations."""
out: list[tuple[str, str, str, str]] = []
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
if node.returns is None:
continue
ret = node.returns
if isinstance(ret, ast.Name):
aggregate = ret.id
out.append((node.name, aggregate, "producer", "high"))
elif isinstance(ret, ast.Subscript):
value = ret.value
sl = ret.slice
if isinstance(value, ast.Name) and value.id == "Result":
if isinstance(sl, ast.Name):
out.append((node.name, sl.id, "producer", "high"))
elif isinstance(sl, ast.Subscript) and isinstance(sl.value, ast.Name):
out.append((node.name, sl.value.id, "producer", "high"))
return out
def P2_pass(tree: ast.Module, file: str) -> list[tuple[str, str, str, str]]:
"""AST pass 2: detect consumers of typed aggregates via parameter annotations."""
out: list[tuple[str, str, str, str]] = []
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
for arg in node.args.args + node.args.kwonlyargs:
if arg.annotation is None:
continue
ann = arg.annotation
if isinstance(ann, ast.Name):
out.append((node.name, ann.id, "consumer", "high"))
elif isinstance(ann, ast.Subscript):
if isinstance(ann.value, ast.Name) and ann.value.id in ("list", "List"):
sl = ann.slice
if isinstance(sl, ast.Name):
out.append((node.name, sl.id, "consumer", "high"))
return out
def P3_pass(tree: ast.Module, file: str, type_registry: dict[str, list[str]]) -> list[tuple[str, str, str, int]]:
"""AST pass 3: detect field accesses via entry['key'] or entry.attr."""
out: list[tuple[str, str, str, int]] = []
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
counts: dict[tuple[str, str], int] = {}
for sub in ast.walk(node):
if isinstance(sub, ast.Subscript):
if isinstance(sub.value, ast.Name) and isinstance(sub.slice, ast.Constant) and isinstance(sub.slice.value, str):
k = ("subscript", sub.slice.value)
counts[k] = counts.get(k, 0) + 1
elif isinstance(sub, ast.Attribute):
if isinstance(sub.value, ast.Name):
k = ("attribute", sub.attr)
counts[k] = counts.get(k, 0) + 1
for (kind, key), c in counts.items():
out.append((node.name, key, kind, c))
return out
def build_pcg(src_dir: str, type_registry: dict[str, list[str]] | None = None) -> Result[ProducerConsumerGraph]:
"""Build the ProducerConsumerGraph by AST-walking src/."""
pcg = ProducerConsumerGraph()
type_registry = type_registry or {}
errors: list[ErrorInfo] = []
for py_file in Path(src_dir).rglob("*.py"):
if "__pycache__" in str(py_file):
continue
try:
source = py_file.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError) as e:
errors.append(ErrorInfo(
kind=ErrorKind.INTERNAL,
message=f"Cannot read {py_file}: {e}",
source="build_pcg",
original=e,
))
continue
try:
tree = ast.parse(source)
except SyntaxError as e:
errors.append(ErrorInfo(
kind=ErrorKind.INVALID_INPUT,
message=f"Syntax error in {py_file}: {e}",
source="build_pcg",
original=e,
))
continue
file = str(py_file)
fqname_prefix = file.removesuffix(".py").replace("/", ".").replace("\\", ".")
for fn, agg, role, conf in P1_pass(tree, file):
fref = FunctionRef(fqname=fqname_prefix + "." + fn, file=file, line=0, role=role)
if role == "producer":
pcg.add_producer(agg, fref)
for fn, agg, role, conf in P2_pass(tree, file):
fref = FunctionRef(fqname=fqname_prefix + "." + fn, file=file, line=0, role=role)
if role == "consumer":
pcg.add_consumer(agg, fref)
for fn, key, kind, count in P3_pass(tree, file, type_registry):
pass
return Result(data=pcg, errors=errors)
CANONICAL_MEMORY_DIM: dict[str, MemoryDim] = {
"Metadata": "discussion",
"CommsLogEntry": "discussion",
"CommsLog": "discussion",
"HistoryMessage": "discussion",
"History": "discussion",
"FileItem": "curation",
"FileItems": "curation",
"ToolDefinition": "control",
"ToolCall": "control",
"Result": "control",
"ErrorInfo": "control",
"ToolSpec": "control",
"ToolParameter": "control",
"ChatMessage": "discussion",
"UsageStats": "control",
"NormalizedResponse": "control",
"ProviderHistory": "discussion",
"OpenAICompatibleRequest": "control",
"Session": "knowledge",
"SessionMetadata": "knowledge",
"WebSocketMessage": "control",
"JsonValue": "control",
"ManualSlopConfig": "config",
"VendorCapabilities": "control",
}
MEMORY_DIM_FILE_HEURISTIC: dict[MemoryDim, tuple[str, ...]] = {
"curation": ("src/aggregate.py", "src/context_presets.py", "src/views.py"),
"discussion": ("src/ai_client.py", "src/history.py", "src/session_logger.py"),
"rag": ("src/rag_engine.py", "src/rag_index.py"),
"knowledge": ("src/knowledge.py", "src/knowledge_curation.py"),
"config": ("src/paths.py", "src/presets.py", "src/personas.py", "src/context_presets.py", "src/tool_presets.py"),
}
def load_memory_dim_overrides(path: str) -> dict[str, MemoryDim]:
"""Load memory_dim overrides from a TOML file."""
p = Path(path)
if not p.exists():
return {}
with p.open("rb") as f:
data = tomllib.load(f)
out: dict[str, MemoryDim] = {}
for key, value in data.get("memory_dim", {}).items():
if isinstance(value, str):
out[key] = value
return out
def file_origin_memory_dim(file: str) -> MemoryDim:
"""Determine the memory dim from the file of origin."""
for dim, files in MEMORY_DIM_FILE_HEURISTIC.items():
for f in files:
if file.startswith(f):
return dim
return "unknown"
def classify_memory_dim(aggregate: str, primary_producer_file: str, overrides: dict[str, MemoryDim]) -> MemoryDim:
"""Classify the memory dim of an aggregate.
Precedence: overrides > canonical > file_of_origin > unknown.
"""
if aggregate in overrides:
return overrides[aggregate]
if aggregate in CANONICAL_MEMORY_DIM:
return CANONICAL_MEMORY_DIM[aggregate]
return file_origin_memory_dim(primary_producer_file)
WHOLE_STRUCT_KEY_THRESHOLD: int = 1
FIELD_BY_FIELD_KEY_THRESHOLD: int = 3
MIXED_DOMINANCE_THRESHOLD: float = 0.6
AGGREGATE_LEVEL_DOMINANCE_THRESHOLD: float = 0.25
def is_whole_struct_access(field_counts: Counter, has_direct_access: bool) -> bool:
"""Detect whole_struct access: <=WHOLE_STRUCT_KEY_THRESHOLD distinct keys AND (direct access or 0 keys)."""
if has_direct_access:
return True
return len(field_counts) <= WHOLE_STRUCT_KEY_THRESHOLD
def is_field_by_field_access(field_counts: Counter) -> bool:
"""Detect field_by_field access: >=FIELD_BY_FIELD_KEY_THRESHOLD=3 distinct keys."""
return len(field_counts) >= FIELD_BY_FIELD_KEY_THRESHOLD
def is_hot_cold_split(hot_keys: set[str], cold_keys: set[str]) -> bool:
"""Detect hot_cold_split access: 1-2 hot keys in main body + 2+ cold keys in if/else branches."""
return 1 <= len(hot_keys) <= 2 and len(cold_keys) >= 2
def is_bulk_batched_access(iterates_over_list: bool, body_accesses_uniform: bool) -> bool:
"""Detect bulk_batched access: iterates over list[aggregate] with uniform field access."""
return iterates_over_list and body_accesses_uniform
def dominant_pattern(per_function_pattern_counts: dict[str, int]) -> AccessPattern:
"""Determine the aggregate-level dominant pattern from per-function pattern counts."""
if not per_function_pattern_counts:
return "mixed"
total = sum(per_function_pattern_counts.values())
winner = max(per_function_pattern_counts, key=per_function_pattern_counts.get)
share = per_function_pattern_counts[winner] / total
if share <= AGGREGATE_LEVEL_DOMINANCE_THRESHOLD:
return "mixed"
return winner
def detect_access_pattern(
field_counts: Counter,
has_direct_access: bool,
hot_keys: set[str] | None = None,
cold_keys: set[str] | None = None,
) -> AccessPattern:
"""Detect the per-function access pattern.
Precedence: whole_struct > hot_cold_split > field_by_field > mixed.
"""
if is_whole_struct_access(field_counts, has_direct_access):
return "whole_struct"
if hot_keys is not None and cold_keys is not None:
if is_hot_cold_split(hot_keys, cold_keys):
return "hot_cold_split"
if is_field_by_field_access(field_counts):
return "field_by_field"
return "mixed"
INIT_CALLERS = frozenset({"__init__", "warmup"})
HOT_CALLERS = frozenset({"render_main_toolbar", "render_menu_bar", "render_frame", "update"})
PER_TURN_CALLERS = frozenset({
"_send_anthropic_result", "_send_deepseek_result", "_send_minimax_result",
"_send_qwen_result", "_send_grok_result", "_send_llama_result",
"_send_gemini_result", "_send_gemini_cli_result",
"process_user_request", "_handle_generate_send",
})
COLD_CALLERS = frozenset({"cleanup", "reset_session", "_classify_anthropic_error", "_classify_gemini_error"})
PER_DISCUSSION_CALLERS = frozenset({"save_project", "load_project", "save_snapshot", "load_snapshot"})
PER_REQUEST_CALLERS = frozenset({
"_api_get_key", "_api_status", "_api_performance", "_api_gui",
"_api_mma_status", "_api_comms", "_api_diagnostics",
})
def detect_frequency_from_entry_point(caller: str, caller_class: str) -> Frequency:
"""Detect the call frequency from the caller name and class."""
if caller in INIT_CALLERS:
return "init"
if caller in HOT_CALLERS:
return "hot"
if caller in PER_TURN_CALLERS:
return "per_turn"
if caller in COLD_CALLERS:
return "cold"
if caller in PER_DISCUSSION_CALLERS:
return "per_discussion"
if caller in PER_REQUEST_CALLERS:
return "per_request"
return "unknown"
def load_frequency_overrides(path: str) -> dict[str, Frequency]:
"""Load frequency overrides from a TOML file."""
p = Path(path)
if not p.exists():
return {}
with p.open("rb") as f:
data = tomllib.load(f)
out: dict[str, Frequency] = {}
for key, value in data.get("frequency", {}).items():
if isinstance(value, str):
out[key] = value
return out
def estimate_call_frequency(
function: FunctionRef,
callers: list[tuple[FunctionRef, str]],
overrides: dict[str, Frequency],
) -> Frequency:
"""Estimate the call frequency of a function.
Precedence: override > entry-point detector > unknown.
"""
if function.fqname in overrides:
return overrides[function.fqname]
if callers:
first_caller, caller_class = callers[0]
return detect_frequency_from_entry_point(first_caller.fqname.rsplit(".", 1)[-1], caller_class)
return "unknown"
MICROSECOND_BUDGET_PER_LLM_TURN: int = 50_000
BRANCH_DISPATCH_OVERHEAD_US: int = 100
ALLOCATION_OVERHEAD_US: int = 50
DEAD_FIELD_COST_PER_FIELD_US: int = 10
COMPONENTIZATION_INDIRECTION_US: int = 200
UNIFICATION_INDIRECTION_US: int = 300
def per_call_cost_us(struct_field_count: int, hot_path_field_count: int, struct_frozen: bool) -> int:
"""Per-call cost in microseconds."""
return (
struct_field_count * ALLOCATION_OVERHEAD_US
+ max(hot_path_field_count, 1) * BRANCH_DISPATCH_OVERHEAD_US
+ (20 if struct_frozen else 0)
)
FREQUENCY_MULTIPLIER: dict[Frequency, float] = {
"hot": 60.0,
"per_turn": 1.0,
"per_request": 1.0,
"per_discussion": 1.0,
"cold": 0.01,
"init": 0.001,
"unknown": 0.0,
}
def current_total_us(per_call_cost: int, frequency: Frequency) -> int:
"""Current total microsecond cost (per unit of frequency)."""
return int(per_call_cost * FREQUENCY_MULTIPLIER[frequency])
def componentize_factor(
access_pattern: AccessPattern,
struct_field_count: int,
struct_frozen: bool,
hot_field_count: int = 0,
) -> float:
"""Determine the componentize factor per spec section 7.5."""
if access_pattern == "field_by_field" and struct_field_count > 10 and not struct_frozen:
return 0.30
if access_pattern == "hot_cold_split" and hot_field_count <= 2 and struct_field_count > 5:
return 0.40
if access_pattern in ("whole_struct", "bulk_batched"):
return -0.20
if access_pattern == "mixed":
return 0.0
return -0.10
def unify_factor(access_pattern: AccessPattern, struct_field_count: int, struct_frozen: bool) -> float:
"""Determine the unify factor per spec section 7.5."""
if access_pattern == "bulk_batched" and struct_field_count <= 3 and struct_frozen:
return 0.25
if access_pattern == "whole_struct" and struct_field_count <= 5 and struct_frozen:
return 0.15
if access_pattern == "field_by_field":
return -0.30
if access_pattern == "hot_cold_split":
return -0.10
if access_pattern == "mixed":
return 0.0
return 0.05
def recommended_direction(
access_pattern: AccessPattern,
struct_field_count: int,
struct_frozen: bool,
frequency: Frequency,
hot_field_count: int = 0,
) -> RecommendedDirection:
"""Determine the recommended decomposition direction per spec section 7.5.
Frozen whole_struct is the ideal shape -> hold (overrides unify).
"""
if access_pattern == "field_by_field" and struct_field_count > 10:
return "componentize"
if access_pattern == "hot_cold_split" and hot_field_count <= 2:
return "componentize"
if access_pattern == "bulk_batched" and struct_field_count <= 3:
return "unify"
if access_pattern == "whole_struct" and struct_field_count <= 5 and not struct_frozen:
return "unify"
if access_pattern == "mixed" or frequency == "unknown":
return "insufficient_data"
return "hold"
def generate_rationale(
aggregate: str,
access_pattern: AccessPattern,
frequency: Frequency,
struct_field_count: int,
struct_frozen: bool,
direction: RecommendedDirection,
) -> str:
"""Generate the auto-rationale string per spec section 7.5."""
justification = {
"componentize": "the access pattern is field_by_field and the struct has many dead fields",
"unify": "the access pattern is uniform and the struct is small",
"hold": "the current shape matches the access pattern",
"insufficient_data": "runtime profiling is needed to determine the dominant pattern",
}.get(direction, "no justification available")
return (
f"{aggregate}: access_pattern={access_pattern}, frequency={frequency}, "
f"struct_field_count={struct_field_count}, struct_frozen={struct_frozen}. "
f"Recommended: {direction} because {justification}."
)
def compute_decomposition_cost(
aggregate: str,
access_pattern: AccessPattern,
struct_field_count: int,
struct_frozen: bool,
frequency: Frequency,
hot_field_count: int = 0,
) -> DecompositionCost:
"""Compute the per-aggregate DecompositionCost."""
per_call = per_call_cost_us(struct_field_count, hot_path_field_count=hot_field_count, struct_frozen=struct_frozen)
current_total = current_total_us(per_call, frequency)
direction = recommended_direction(access_pattern, struct_field_count, struct_frozen, frequency, hot_field_count)
c_factor = componentize_factor(access_pattern, struct_field_count, struct_frozen, hot_field_count)
u_factor = unify_factor(access_pattern, struct_field_count, struct_frozen)
c_savings = int(current_total * c_factor) if c_factor > 0 else 0
u_savings = int(current_total * u_factor) if u_factor > 0 else 0
rationale = generate_rationale(aggregate, access_pattern, frequency, struct_field_count, struct_frozen, direction)
return DecompositionCost(
current_cost_estimate=current_total,
componentize_savings=c_savings,
unify_savings=u_savings,
recommended_direction=direction,
recommended_rationale=rationale,
batch_size=None,
struct_field_count=struct_field_count,
struct_frozen=struct_frozen,
)
import json
import json
def read_input_json(path: str) -> Result[dict]:
"""Read a JSON file and return Result[dict].
Per error_handling.md stdlib I/O boundary pattern: catches
OSError (missing/permission denied) and json.JSONDecodeError (malformed
JSON), converts to ErrorInfo.
"""
p = Path(path)
try:
raw = p.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError) as e:
return Result(
data={},
errors=[ErrorInfo(
kind=ErrorKind.NOT_FOUND,
message=f"Cannot read {path}: {e}",
source="read_input_json",
original=e,
)],
)
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
return Result(
data={},
errors=[ErrorInfo(
kind=ErrorKind.INVALID_INPUT,
message=f"Malformed JSON in {path}: {e}",
source="read_input_json",
original=e,
)],
)
if not isinstance(data, dict):
return Result(
data={},
errors=[ErrorInfo(
kind=ErrorKind.INVALID_INPUT,
message=f"JSON root in {path} is not a dict",
source="read_input_json",
)],
)
return Result(data=data)
INPUT_JSON_CONTRACTS: dict[str, dict[str, str]] = {
"audit_weak_types": {
"producer": "scripts/audit_weak_types.py --json",
"filename": "audit_weak_types.json",
},
"audit_exception_handling": {
"producer": "scripts/audit_exception_handling.py --json",
"filename": "audit_exception_handling.json",
},
"audit_optional_in_3_files": {
"producer": "scripts/audit_optional_in_3_files.py --json",
"filename": "audit_optional_in_3_files.json",
},
"audit_no_models_config_io": {
"producer": "scripts/audit_no_models_config_io.py --json",
"filename": "audit_no_models_config_io.json",
},
"audit_main_thread_imports": {
"producer": "scripts/audit_main_thread_imports.py --json",
"filename": "audit_main_thread_imports.json",
},
"type_registry": {
"producer": "scripts/generate_type_registry.py --json",
"filename": "type_registry.json",
},
}
def find_enclosing_function(
file: str,
line: int,
function_refs: list[FunctionRef],
) -> FunctionRef | None:
"""Tier 1 of the 3-tier mapping: find the function ref at (file, line)."""
candidates = [r for r in function_refs if r.file == file and r.line <= line]
if not candidates:
return None
return max(candidates, key=lambda r: r.line)
def compute_result_coverage(
producers: list[FunctionRef],
consumers: list[FunctionRef],
branches_on_errors: set[str],
) -> ResultCoverage:
"""Compute the per-aggregate result coverage.
result_producers: total number of producers (the caller is responsible
for filtering to Result[T] producers; this function reports the raw
count).
result_consumers: consumers whose fqname is in branches_on_errors
(the caller passes the set from AST analysis).
"""
total_producers = len(producers)
result_producers = total_producers
total_consumers = len(consumers)
result_consumers = len({c.fqname for c in consumers if c.fqname in branches_on_errors})
if total_producers > 0 and result_producers == total_producers:
pct_p = 100
else:
pct_p = (result_producers / total_producers * 100) if total_producers > 0 else 0
pct_c = (result_consumers / total_consumers * 100) if total_consumers > 0 else 0
summary = f"{result_producers}/{total_producers} producers return Result[T] ({pct_p:.0f}%); {result_consumers}/{total_consumers} consumers branch on .errors ({pct_c:.0f}%)"
return ResultCoverage(
total_producers=total_producers,
result_producers=result_producers,
total_consumers=total_consumers,
result_consumers=result_consumers,
summary=summary,
)
def compute_type_alias_coverage(total_sites: int, typed_sites: int) -> TypeAliasCoverage:
"""Compute the per-aggregate type alias coverage."""
untyped = total_sites - typed_sites
pct_typed = (typed_sites / total_sites * 100) if total_sites > 0 else 0
pct_untyped = (untyped / total_sites * 100) if total_sites > 0 else 0
summary = f"{total_sites} total sites; {typed_sites} typed ({pct_typed:.0f}%); {untyped} untyped ({pct_untyped:.0f}%)"
return TypeAliasCoverage(
total_sites=total_sites,
typed_sites=typed_sites,
untyped_sites=untyped,
summary=summary,
)
def aggregate_cross_audit_findings(
audit_name: str,
findings: list[dict],
example_file: str,
example_line: int,
) -> CrossAuditFindings:
"""Aggregate audit findings into a per-aggregate CrossAuditFindings.
Returns all-empty CrossAuditFindings when findings is empty (the
empty audit case is represented by 5 empty tuples, not 5 tuples
of zero-count CrossAuditFinding entries).
"""
empty = ()
if not findings:
return CrossAuditFindings(weak_types=empty, exception_handling=empty, optional_in_baseline=empty, config_io_ownership=empty, import_graph=empty)
site_count = len(findings)
note = f"{site_count} sites in producer+consumer functions"
finding = CrossAuditFinding(
audit_script=audit_name,
site_count=site_count,
example_file=example_file,
example_line=example_line,
note=note,
)
buckets = {
"audit_weak_types": "weak_types",
"audit_exception_handling": "exception_handling",
"audit_optional_in_3_files": "optional_in_baseline",
"audit_no_models_config_io": "config_io_ownership",
"audit_main_thread_imports": "import_graph",
}
field = buckets.get(audit_name)
if field is None:
return CrossAuditFindings(weak_types=empty, exception_handling=empty, optional_in_baseline=empty, config_io_ownership=empty, import_graph=empty)
kwargs = {f: empty for f in buckets.values()}
kwargs[field] = (finding,)
return CrossAuditFindings(**kwargs)
def run_all_cross_audit_reads(audit_inputs_dir: str) -> dict[str, dict]:
"""Read all 6 input JSONs from audit_inputs_dir.
Returns a dict keyed by audit_name. Missing and malformed files
are tolerated (return empty dict).
"""
out: dict[str, dict] = {}
p = Path(audit_inputs_dir)
if not p.exists():
return out
for audit_name, contract in INPUT_JSON_CONTRACTS.items():
json_path = p / contract["filename"]
if not json_path.exists():
out[audit_name] = {}
continue
result = read_input_json(str(json_path))
if result.ok:
out[audit_name] = result.data
else:
out[audit_name] = {}
return out
DSL_WORD_ARITY_V2: dict[str, int] = {
"kind": 1,
"mem-dim": 1,
"fn-ref": 4,
"access-pattern": 1,
"ap-evidence": 4,
"frequency": 1,
"freq-evidence": 4,
"result-coverage": 5,
"type-alias-coverage": 4,
"cross-audit-finding": 5,
"cross-audit-findings": 5,
"decomp-cost": 8,
"opt-candidate": 7,
"is-candidate": 1,
}
import re
from datetime import date as date_mod
def _atom(s: str) -> str:
"""Format a string as a postfix DSL atom (bare or quoted)."""
if any(c in s for c in ('"', "'", " ", "\t", "\n", "(", ")", "{", "}")):
return f'"{s}"'
return s
def to_dsl_v2(profile: AggregateProfile, generated_date: str = "") -> str:
"""Serialize an AggregateProfile to v2 postfix DSL (flat sections)."""
lines: list[str] = []
lines.append(f'\\ AggregateProfile: "{profile.name}"')
lines.append(f"\\ generated {generated_date} by src.code_path_audit v2")
lines.append("")
lines.append("\\ === aggregate_kind ===")
lines.append(f' "{profile.aggregate_kind}" kind')
lines.append("")
lines.append("\\ === memory_dim ===")
lines.append(f' "{profile.memory_dim}" mem-dim')
lines.append("")
lines.append(f"\\ === producers ({len(profile.producers)} items) ===")
for p in profile.producers:
lines.append(f' "{p.fqname}" "{p.file}" {p.line} "{p.role}" fn-ref')
lines.append("")
lines.append(f"\\ === consumers ({len(profile.consumers)} items) ===")
for c in profile.consumers:
lines.append(f' "{c.fqname}" "{c.file}" {c.line} "{c.role}" fn-ref')
lines.append("")
lines.append("\\ === access_pattern ===")
lines.append(f' "{profile.access_pattern}" access-pattern')
lines.append("")
lines.append(f"\\ === access_pattern_evidence ({len(profile.access_pattern_evidence)} items) ===")
for ev in profile.access_pattern_evidence:
lines.append(f' "{ev.function.fqname}" "{ev.pattern}" {len(ev.field_accesses)} "{ev.confidence}" ap-evidence')
lines.append("")
lines.append("\\ === frequency ===")
lines.append(f' "{profile.frequency}" frequency')
lines.append("")
lines.append(f"\\ === frequency_evidence ({len(profile.frequency_evidence)} items) ===")
for ev in profile.frequency_evidence:
lines.append(f' "{ev.function.fqname}" "{ev.frequency}" "{ev.source}" "{ev.note}" freq-evidence')
lines.append("")
rc = profile.result_coverage
lines.append("\\ === result_coverage ===")
lines.append(f" {rc.total_producers} {rc.result_producers} {rc.total_consumers} {rc.result_consumers} result-coverage")
lines.append("")
tac = profile.type_alias_coverage
lines.append("\\ === type_alias_coverage ===")
lines.append(f" {tac.total_sites} {tac.typed_sites} {tac.untyped_sites} type-alias-coverage")
lines.append("")
lines.append("\\ === cross_audit_findings ===")
for f in profile.cross_audit_findings.weak_types:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
for f in profile.cross_audit_findings.exception_handling:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
for f in profile.cross_audit_findings.optional_in_baseline:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
for f in profile.cross_audit_findings.config_io_ownership:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
for f in profile.cross_audit_findings.import_graph:
lines.append(f' "{f.audit_script}" {f.site_count} "{f.example_file}" {f.example_line} "{f.note}" cross-audit-finding')
lines.append(" 5 cross-audit-findings")
lines.append("")
dc = profile.decomposition_cost
lines.append("\\ === decomposition_cost ===")
batch_size_str = str(dc.batch_size) if dc.batch_size is not None else "nil"
lines.append(f" {dc.current_cost_estimate} {dc.componentize_savings} {dc.unify_savings} \"{dc.recommended_direction}\" \"{dc.recommended_rationale}\" {batch_size_str} {dc.struct_field_count} {str(dc.struct_frozen).lower()} decomp-cost")
lines.append("")
lines.append(f"\\ === optimization_candidates ({len(profile.optimization_candidates)} items) ===")
for cand in profile.optimization_candidates:
lines.append(f' "{cand.candidate}" "{cand.direction}" {len(cand.affected_files)} {cand.estimated_savings_us} "{cand.effort}" "{cand.priority}" "{cand.cross_ref}" opt-candidate')
lines.append("")
lines.append("\\ === is_candidate ===")
lines.append(f" {'true' if profile.is_candidate else 'false'} is-candidate")
return "\n".join(lines)
def to_markdown(profile: AggregateProfile) -> str:
"""Render the per-aggregate markdown (10 sections)."""
lines: list[str] = []
lines.append(f"# Aggregate Profile: {profile.name}")
lines.append("")
lines.append(f"**Aggregate kind:** {profile.aggregate_kind}")
lines.append(f"**Memory dim:** {profile.memory_dim}")
lines.append(f"**Is candidate:** {profile.is_candidate}")
lines.append("")
lines.append("## Pipeline summary")
lines.append("")
lines.append(f"- Producers: {len(profile.producers)}")
lines.append(f"- Consumers: {len(profile.consumers)}")
lines.append("")
lines.append("## Access pattern")
lines.append("")
lines.append(f"**Dominant pattern:** {profile.access_pattern}")
lines.append(f"**Evidence count:** {len(profile.access_pattern_evidence)}")
lines.append("")
lines.append("## Frequency")
lines.append("")
lines.append(f"**Dominant frequency:** {profile.frequency}")
lines.append(f"**Evidence count:** {len(profile.frequency_evidence)}")
lines.append("")
lines.append("## Result coverage")
lines.append("")
lines.append(f"**Summary:** {profile.result_coverage.summary}")
lines.append("")
lines.append("## Type alias coverage")
lines.append("")
lines.append(f"**Summary:** {profile.type_alias_coverage.summary}")
lines.append("")
lines.append("## Cross-audit findings")
lines.append("")
lines.append("| Audit script | Site count | Example | Note |")
lines.append("|---|---|---|---|")
for f in profile.cross_audit_findings.weak_types:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.exception_handling:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.optional_in_baseline:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.config_io_ownership:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.import_graph:
lines.append(f"| {f.audit_script} | {f.site_count} | {f.example_file}:{f.example_line} | {f.note} |")
lines.append("")
lines.append("## Decomposition cost")
lines.append("")
lines.append(f"**Current cost estimate:** {profile.decomposition_cost.current_cost_estimate} us")
lines.append(f"**Componentize savings:** {profile.decomposition_cost.componentize_savings} us")
lines.append(f"**Unify savings:** {profile.decomposition_cost.unify_savings} us")
lines.append(f"**Recommended direction:** {profile.decomposition_cost.recommended_direction}")
lines.append(f"**Rationale:** {profile.decomposition_cost.recommended_rationale}")
lines.append("")
lines.append("## Optimization candidates")
lines.append("")
if profile.optimization_candidates:
for cand in profile.optimization_candidates:
lines.append(f"- **{cand.direction}** ({cand.effort}, {cand.priority}): {cand.candidate}")
else:
lines.append("_(none)_")
lines.append("")
lines.append("## Verdict")
lines.append("")
lines.append(f"{profile.decomposition_cost.recommended_rationale}")
return "\n".join(lines)
def to_tree(profile: AggregateProfile) -> str:
"""Render the per-aggregate prefix tree (box-drawing)."""
lines: list[str] = [f"Metadata: {profile.name}"]
lines.append(f"|- kind: {profile.aggregate_kind}")
lines.append(f"|- memory_dim: {profile.memory_dim}")
lines.append(f"|- producers: [{len(profile.producers)}]")
for p in profile.producers:
lines.append(f"| |- {p.fqname} ({p.role})")
lines.append(f"|- consumers: [{len(profile.consumers)}]")
for c in profile.consumers:
lines.append(f"| |- {c.fqname} ({c.role})")
lines.append(f"|- access_pattern: {profile.access_pattern}")
lines.append(f"|- frequency: {profile.frequency}")
lines.append(f"|- result_coverage: {profile.result_coverage.summary}")
lines.append(f"|- type_alias_coverage: {profile.type_alias_coverage.summary}")
cf_total = (
len(profile.cross_audit_findings.weak_types) +
len(profile.cross_audit_findings.exception_handling) +
len(profile.cross_audit_findings.optional_in_baseline) +
len(profile.cross_audit_findings.config_io_ownership) +
len(profile.cross_audit_findings.import_graph)
)
lines.append(f"|- cross_audit_findings: {cf_total} findings")
lines.append(f"|- decomposition_cost: {profile.decomposition_cost.recommended_direction} ({profile.decomposition_cost.current_cost_estimate} us)")
lines.append(f"|- optimization_candidates: [{len(profile.optimization_candidates)}]")
return "\n".join(lines)
def parse_dsl_v2(text: str) -> Result[dict]:
"""Parse a v2 postfix DSL into a nested dict (round-trip)."""
tokens: list[str] = []
for line in text.splitlines():
line = re.sub(r"\\.*", "", line)
if not line.strip():
continue
i = 0
while i < len(line):
c = line[i]
if c.isspace():
i += 1
continue
if c == '"':
j = line.find('"', i + 1)
if j == -1:
j = len(line)
tokens.append(line[i + 1 : j])
i = j + 1
else:
j = i
while j < len(line) and not line[j].isspace():
j += 1
tokens.append(line[i:j])
i = j
stack: list = []
i = 0
while i < len(tokens):
t = tokens[i]
if t == "list" and stack and isinstance(stack[-1], int):
count = stack.pop()
items = stack[-count:] if count > 0 else []
stack = stack[:-count] if count > 0 else stack
stack.append(items)
i += 1
continue
if t in DSL_WORD_ARITY_V2:
nargs = DSL_WORD_ARITY_V2[t]
args = stack[-nargs:] if nargs else []
stack = stack[:-nargs] if nargs else stack
stack.append({"_tag": t, "_args": args})
i += 1
continue
if t in ("true", "false"):
stack.append(t == "true")
elif t == "nil":
stack.append(None)
elif t.lstrip("-").isdigit():
stack.append(int(t))
else:
stack.append(t)
i += 1
if len(stack) != 1:
return Result(
data={"_sections": stack},
)
return Result(data=stack[0])
AGGREGATES_IN_SCOPE: tuple[str, ...] = (
"Metadata",
"FileItem",
"FileItems",
"CommsLogEntry",
"CommsLog",
"HistoryMessage",
"History",
"ToolDefinition",
"ToolCall",
"Result",
)
CANDIDATE_AGGREGATES: tuple[str, ...] = (
"ToolSpec",
"ChatMessage",
"ProviderHistory",
)
def synthesize_aggregate_profile(
aggregate: str,
pcg_producers: dict[str, list[FunctionRef]],
pcg_consumers: dict[str, list[FunctionRef]],
audit_inputs: dict[str, dict],
overrides: dict,
is_candidate: bool,
) -> AggregateProfile:
"""Synthesize one AggregateProfile."""
if is_candidate:
return AggregateProfile(
name=aggregate,
aggregate_kind="candidate_dataclass",
memory_dim="discussion" if aggregate == "ChatMessage" else "unknown",
producers=(),
consumers=(),
access_pattern="mixed",
access_pattern_evidence=(),
frequency="unknown",
frequency_evidence=(),
result_coverage=ResultCoverage(0, 0, 0, 0, ""),
type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""),
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
decomposition_cost=DecompositionCost(0, 0, 0, "insufficient_data", "candidate aggregate; would be detected after any_type_componentization_20260621 merges", None, 0, False),
optimization_candidates=(),
is_candidate=True,
)
producers = tuple(pcg_producers.get(aggregate, []))
consumers = tuple(pcg_consumers.get(aggregate, []))
kind: AggregateKind = "typealias" if aggregate in AGGREGATES_IN_SCOPE else "dataclass"
memory_dim = classify_memory_dim(
aggregate,
producers[0].file if producers else "",
overrides.get("memory_dim", {}) if isinstance(overrides, dict) else {},
)
from src.code_path_audit_analysis import (
aggregate_pattern_from_consumers,
compute_real_type_alias_coverage,
compute_real_decomposition_cost,
extract_real_optimization_candidates,
)
type_registry = audit_inputs.get("type_registry", {}).get("types", {}) if isinstance(audit_inputs.get("type_registry"), dict) else {}
pattern, _per_pattern_counts, evidence = aggregate_pattern_from_consumers(
consumers, aggregate, type_registry, "src"
)
tac = compute_real_type_alias_coverage(aggregate, producers, consumers, type_registry, "src")
producer_count = len({f.fqname for f in producers})
consumer_count = len({f.fqname for f in consumers})
branches_on_errors = set()
for f in producers:
branches_on_errors.add(f.fqname)
rc = ResultCoverage(
total_producers=producer_count,
result_producers=producer_count,
total_consumers=consumer_count,
result_consumers=0,
summary=f"{producer_count} producers, {consumer_count} consumers",
)
dc = compute_real_decomposition_cost(aggregate, producers, consumers, pattern, "per_turn", type_registry, "src")
candidates = extract_real_optimization_candidates(aggregate, producers, consumers, dc, type_registry, "src")
freq_evidence = tuple(
FrequencyEvidence(function=f, frequency="per_turn", source="static_analysis", note=f"producer from {f.file}")
for f in producers[:5]
)
return AggregateProfile(
name=aggregate,
aggregate_kind=kind,
memory_dim=memory_dim,
producers=producers,
consumers=consumers,
access_pattern=pattern,
access_pattern_evidence=evidence,
frequency="per_turn",
frequency_evidence=freq_evidence,
result_coverage=rc,
type_alias_coverage=tac,
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
decomposition_cost=dc,
optimization_candidates=candidates,
is_candidate=False,
)
@dataclass(frozen=True)
class AuditSummary:
aggregate_profiles: tuple[AggregateProfile, ...]
output_paths: dict[str, str] = field(default_factory=dict)
def run_audit(
src_dir: str,
audit_inputs_dir: str,
output_dir: str,
date: str,
) -> Result[AuditSummary]:
"""Run the full v2 audit pipeline."""
audit_inputs = run_all_cross_audit_reads(audit_inputs_dir)
pcg_result = build_pcg(src_dir)
if not pcg_result.ok:
return Result(data=AuditSummary(aggregate_profiles=(), output_paths={}), errors=pcg_result.errors)
pcg = pcg_result.data
overrides: dict = {}
profiles: list[AggregateProfile] = []
for aggregate in AGGREGATES_IN_SCOPE:
profile = synthesize_aggregate_profile(
aggregate=aggregate,
pcg_producers=pcg.producers,
pcg_consumers=pcg.consumers,
audit_inputs=audit_inputs,
overrides=overrides,
is_candidate=False,
)
profiles.append(profile)
for candidate in CANDIDATE_AGGREGATES:
profile = synthesize_aggregate_profile(
aggregate=candidate,
pcg_producers=pcg.producers,
pcg_consumers=pcg.consumers,
audit_inputs=audit_inputs,
overrides=overrides,
is_candidate=True,
)
profiles.append(profile)
output_dir_p = Path(output_dir) / date
(output_dir_p / "aggregates").mkdir(parents=True, exist_ok=True)
output_paths: dict[str, str] = {}
for profile in profiles:
agg_dir = output_dir_p / "aggregates"
dsl_path = agg_dir / f"{profile.name}.dsl"
md_path = agg_dir / f"{profile.name}.md"
tree_path = agg_dir / f"{profile.name}.tree"
dsl_path.write_text(to_dsl_v2(profile, generated_date=date), encoding="utf-8")
md_path.write_text(to_markdown(profile), encoding="utf-8")
tree_path.write_text(to_tree(profile), encoding="utf-8")
output_paths[profile.name] = str(dsl_path)
return Result(data=AuditSummary(aggregate_profiles=tuple(profiles), output_paths=output_paths))
def render_rollups(summary: AuditSummary, output_dir: Path) -> dict[str, str]:
"""Render the 4 top-level rollup files."""
output_dir.mkdir(parents=True, exist_ok=True)
summary_path = output_dir / "summary.md"
cross_audit_path = output_dir / "cross_audit_summary.md"
decomposition_matrix_path = output_dir / "decomposition_matrix.md"
candidates_path = output_dir / "candidates.md"
profiles = summary.aggregate_profiles
summary_lines: list[str] = ["# Code Path & Data Pipeline Audit Summary", "", f"Generated for {len(profiles)} aggregates", ""]
summary_lines.append("## 4-mem-dim rollup")
summary_lines.append("")
by_dim: dict[str, list[str]] = {}
for p in profiles:
by_dim.setdefault(p.memory_dim, []).append(p.name)
for dim, names in sorted(by_dim.items()):
summary_lines.append(f"- **{dim}** ({len(names)}): {', '.join(names)}")
summary_lines.append("")
summary_lines.append("## Cross-validation verdict")
summary_lines.append("")
for p in profiles:
rc = p.result_coverage
tac = p.type_alias_coverage
summary_lines.append(f"- **{p.name}**: result_coverage={rc.summary}; type_alias_coverage={tac.summary}")
summary_path.write_text("\n".join(summary_lines), encoding="utf-8")
cross_audit_lines: list[str] = ["# Cross-Audit Summary", "", "| Aggregate | weak_types | exception_handling | optional_in_baseline | config_io | import_graph | total |", "|---|---|---|---|---|---|---|"]
for p in profiles:
cf = p.cross_audit_findings
total = len(cf.weak_types) + len(cf.exception_handling) + len(cf.optional_in_baseline) + len(cf.config_io_ownership) + len(cf.import_graph)
cross_audit_lines.append(f"| {p.name} | {len(cf.weak_types)} | {len(cf.exception_handling)} | {len(cf.optional_in_baseline)} | {len(cf.config_io_ownership)} | {len(cf.import_graph)} | {total} |")
cross_audit_path.write_text("\n".join(cross_audit_lines), encoding="utf-8")
deco_lines: list[str] = ["# Decomposition Matrix", "", "## Top 10 candidates by estimated savings", "", "| Rank | Aggregate | Direction | Est. savings (us) | Frequency | Effort | Priority |", "|---|---|---|---|---|---|---|"]
candidates_with_direction = [(p, p.decomposition_cost.componentize_savings + p.decomposition_cost.unify_savings, p.frequency, "n/a", "n/a") for p in profiles if p.decomposition_cost.recommended_direction in ("componentize", "unify")]
candidates_with_direction.sort(key=lambda x: -x[1])
for i, (p, savings, freq, effort, priority) in enumerate(candidates_with_direction[:10], 1):
deco_lines.append(f"| {i} | {p.name} | {p.decomposition_cost.recommended_direction} | {savings} | {freq} | {effort} | {priority} |")
decomposition_matrix_path.write_text("\n".join(deco_lines), encoding="utf-8")
cand_lines: list[str] = ["# Candidate Aggregates", "", "The 3 candidate aggregates (forward-compat placeholders for any_type_componentization_20260621, NOT on master).", ""]
for p in profiles:
if p.is_candidate:
cand_lines.append(f"- **{p.name}**: candidate; would be detected after any_type_componentization_20260621 merges")
candidates_path.write_text("\n".join(cand_lines), encoding="utf-8")
return {
"summary.md": str(summary_path),
"cross_audit_summary.md": str(cross_audit_path),
"decomposition_matrix.md": str(decomposition_matrix_path),
"candidates.md": str(candidates_path),
}
def code_path_audit_v2(
src_dir: str = "src",
audit_inputs_dir: str = "tests/artifacts/audit_inputs",
output_dir: str = "docs/reports/code_path_audit",
date: str | None = None,
) -> dict:
"""MCP tool wrapper for the v2 audit."""
date_str = date or date_mod.today().isoformat()
result = run_audit(src_dir=src_dir, audit_inputs_dir=audit_inputs_dir, output_dir=output_dir, date=date_str)
return {
"profiles": [
{
"name": p.name,
"kind": p.aggregate_kind,
"memory_dim": p.memory_dim,
"access_pattern": p.access_pattern,
"frequency": p.frequency,
"recommended_direction": p.decomposition_cost.recommended_direction,
"is_candidate": p.is_candidate,
}
for p in result.data.aggregate_profiles
],
"errors": [e.ui_message() for e in result.errors],
}