Private
Public Access
0
0

feat(audit): per-aggregate cross_audit mapping via PCG file-index

The aggregate_findings function now does 3-tier mapping:
1. Function lookup (find_enclosing_function) -> exact match
2. File-level fallback: if the finding's file has any
   producer/consumer of the aggregate, bucket it there
3. Unbucketed (the file has no aggregate refs)

Handles both 'file' and 'filename' keys (v1 audit scripts use
'filename'; spec fixtures use 'file'). Path normalization
for Windows paths.

Generated the 6 real audit_inputs from scripts/audit_*.py
against real src/. The Metadata aggregate now shows:
- 1 unique weak_types finding (1 site, from ai_client.py:159)
- 1 unique exception_handling finding (76 sites from PARAM_OPTIONAL)

mcp_client.py shows 0 because no Metadata producer/consumer
exists in the PCG for mcp_client (P1/P2 only detect typed
parameter signatures, not internal field access). The next
gap is expanding P3 to capture internal field use.
This commit is contained in:
2026-06-22 09:48:56 -04:00
parent 8d2dffd7c5
commit 67ca680a05
10 changed files with 437 additions and 263 deletions
+22 -20
View File
@@ -1043,8 +1043,15 @@ def synthesize_aggregate_profile(
audit_inputs: dict[str, dict],
overrides: dict,
is_candidate: bool,
_full_pcg_producers: dict[str, list[FunctionRef]] | None = None,
_full_pcg_consumers: dict[str, list[FunctionRef]] | None = None,
) -> AggregateProfile:
"""Synthesize one AggregateProfile."""
"""Synthesize one AggregateProfile.
_full_pcg_producers and _full_pcg_consumers are the full PCG dicts
across all aggregates (used for cross-audit mapping). If not provided,
fall back to this aggregate's refs only.
"""
if is_candidate:
return AggregateProfile(
name=aggregate,
@@ -1082,25 +1089,18 @@ def synthesize_aggregate_profile(
consumers, aggregate, type_registry, "src"
)
tac = compute_real_type_alias_coverage(aggregate, producers, consumers, type_registry, "src")
cross_findings = CrossAuditFindings((), (), (), (), ())
for audit_name in ("audit_weak_types", "audit_exception_handling"):
from src.code_path_audit_cross_audit import (
aggregate_findings,
build_cross_audit_findings_for_aggregate,
)
full_producers = _full_pcg_producers if _full_pcg_producers is not None else pcg_producers
full_consumers = _full_pcg_consumers if _full_pcg_consumers is not None else pcg_consumers
aggregated: dict[str, dict[str, list]] = {}
for audit_name in ("audit_weak_types", "audit_exception_handling", "audit_optional_in_3_files", "audit_no_models_config_io", "audit_main_thread_imports"):
if audit_name in audit_inputs:
findings = audit_inputs[audit_name].get("findings", [])
example_file = findings[0].get("file", "") if findings else ""
example_line = findings[0].get("line", 0) if findings else 0
matched = aggregate_cross_audit_findings(
audit_name=audit_name,
findings=findings,
example_file=example_file,
example_line=example_line,
)
cross_findings = CrossAuditFindings(
weak_types=cross_findings.weak_types + matched.weak_types,
exception_handling=cross_findings.exception_handling + matched.exception_handling,
optional_in_baseline=cross_findings.optional_in_baseline + matched.optional_in_baseline,
config_io_ownership=cross_findings.config_io_ownership + matched.config_io_ownership,
import_graph=cross_findings.import_graph + matched.import_graph,
)
aggregated[audit_name] = aggregate_findings(audit_name, findings, full_producers, full_consumers)
cross_findings = build_cross_audit_findings_for_aggregate(aggregate, aggregated)
producer_count = len({f.fqname for f in producers})
consumer_count = len({f.fqname for f in consumers})
branches_on_errors = set()
@@ -1159,11 +1159,13 @@ def run_audit(
for aggregate in AGGREGATES_IN_SCOPE:
profile = synthesize_aggregate_profile(
aggregate=aggregate,
pcg_producers=pcg.producers,
pcg_consumers=pcg.consumers,
pcg_producers={aggregate: list(pcg.producers.get(aggregate, []))},
pcg_consumers={aggregate: list(pcg.consumers.get(aggregate, []))},
audit_inputs=audit_inputs,
overrides=overrides,
is_candidate=False,
_full_pcg_producers=pcg.producers,
_full_pcg_consumers=pcg.consumers,
)
profiles.append(profile)
for candidate in CANDIDATE_AGGREGATES: