Private
Public Access
0
0

feat(audit): alias resolution - all real aggregates now have data

This commit is contained in:
2026-06-22 12:52:22 -04:00
parent 077149011b
commit f7f616abb9
38 changed files with 20604 additions and 3009 deletions
+47 -7
View File
@@ -208,6 +208,44 @@ def _extract_type_name(ann: ast.expr) -> str | None:
return None
ALIAS_TO_BASE: dict[str, str] = {
"Metadata": "dict[str, Any]",
"CommsLogEntry": "Metadata",
"CommsLog": "list[CommsLogEntry]",
"HistoryMessage": "Metadata",
"History": "list[HistoryMessage]",
"FileItem": "Metadata",
"FileItems": "list[FileItem]",
"ToolDefinition": "Metadata",
"ToolCall": "Metadata",
}
def _resolve_aliases(aggregate_name: str | None) -> list[str]:
"""Given a detected aggregate name, resolve to ALL aliases it represents.
For example, if a function returns dict[str, Any], resolve to
[Metadata, CommsLogEntry, HistoryMessage, FileItem, ToolDefinition, ToolCall]
because all of those TypeAliases point to dict[str, Any].
Also resolves transitive: list[CommsLogEntry] -> CommsLog,
because CommsLog: TypeAlias = list[CommsLogEntry].
"""
if aggregate_name is None:
return []
result: list[str] = []
for alias, base in ALIAS_TO_BASE.items():
if aggregate_name in base:
result.append(alias)
elif f"[{aggregate_name}]" in base:
result.append(alias)
elif aggregate_name == alias:
result.append(alias)
if not result and aggregate_name:
result = [aggregate_name]
return result
def P1_pass(tree: ast.Module, file: str) -> list[tuple[str, str, str, str, int]]:
"""AST pass 1: detect producers of T and Result[T] via return annotations."""
out: list[tuple[str, str, str, str, int]] = []
@@ -217,8 +255,9 @@ def P1_pass(tree: ast.Module, file: str) -> list[tuple[str, str, str, str, int]]
if node.returns is None:
continue
aggregate = _extract_type_name(node.returns)
if aggregate and aggregate not in ("None", "NoneType"):
out.append((node.name, aggregate, "producer", "high", node.lineno))
for resolved in _resolve_aliases(aggregate):
if resolved not in ("None", "NoneType"):
out.append((node.name, resolved, "producer", "high", node.lineno))
return out
@@ -232,8 +271,9 @@ def P2_pass(tree: ast.Module, file: str) -> list[tuple[str, str, str, str, int]]
if arg.annotation is None:
continue
aggregate = _extract_type_name(arg.annotation)
if aggregate and aggregate not in ("None", "NoneType"):
out.append((node.name, aggregate, "consumer", "high", node.lineno))
for resolved in _resolve_aliases(aggregate):
if resolved not in ("None", "NoneType"):
out.append((node.name, resolved, "consumer", "high", node.lineno))
return out
@@ -1118,9 +1158,9 @@ def synthesize_aggregate_profile(
)
type_registry = audit_inputs.get("type_registry", {}).get("types", {}) if isinstance(audit_inputs.get("type_registry"), dict) else {}
pattern, _per_pattern_counts, evidence = aggregate_pattern_from_consumers(
consumers, aggregate, type_registry, "src"
consumers[:50], aggregate, type_registry, "src"
)
tac = compute_real_type_alias_coverage(aggregate, producers, consumers, type_registry, "src")
tac = compute_real_type_alias_coverage(aggregate, producers[:50], consumers[:50], type_registry, "src")
from src.code_path_audit_cross_audit import (
aggregate_findings,
build_cross_audit_findings_for_aggregate,
@@ -1145,7 +1185,7 @@ def synthesize_aggregate_profile(
result_consumers=0,
summary=f"{producer_count} producers, {consumer_count} consumers",
)
dc = compute_real_decomposition_cost(aggregate, producers, consumers, pattern, "per_turn", type_registry, "src")
dc = compute_real_decomposition_cost(aggregate, producers[:50], consumers[:50], pattern, "per_turn", type_registry, "src")
candidates = extract_real_optimization_candidates(aggregate, producers, consumers, dc, type_registry, "src")
freq_evidence = tuple(
FrequencyEvidence(function=f, frequency="per_turn", source="static_analysis", note=f"producer from {f.file}")