Private
Public Access
0
0

feat(audit): implement Phase 2 PCG (5 tasks: skeleton + P1+P2+P3+build_pcg)

Phase 2 PCG: ProducerConsumerGraph (bipartite aggregate<->function)
+ 3 AST passes (P1 return-type, P2 parameter-type, P3 field-access)
+ build_pcg() main entry returning Result[ProducerConsumerGraph].

14 new unit tests passing (2 PCG + 3 P1 + 3 P2 + 3 P3 + 3 build_pcg).

The build_pcg() function tolerates syntax errors per the stdlib
I/O boundary pattern (records ErrorInfo, continues).

Phase 2 complete: 33 unit tests passing. Phase 3 (MemoryDim
classifier with canonical mappings) next.
This commit is contained in:
2026-06-22 01:18:54 -04:00
parent f79a2b18a6
commit 200396e4a5
2 changed files with 298 additions and 2 deletions
+157 -1
View File
@@ -1,5 +1,9 @@
"""Tests for src.code_path_audit v2 - Phase 1 (data model)."""
from __future__ import annotations
import ast
import textwrap
import tempfile
from pathlib import Path
import pytest
from src.code_path_audit import (
AggregateKind,
@@ -17,7 +21,13 @@ from src.code_path_audit import (
DecompositionCost,
OptimizationCandidate,
AggregateProfile,
ProducerConsumerGraph,
P1_pass,
P2_pass,
P3_pass,
build_pcg,
)
from src.result_types import Result, ErrorInfo, ErrorKind
def test_aggregate_kind_4_values() -> None:
"""AggregateKind is a Literal with 4 values: typealias, dataclass, candidate_dataclass, builtin."""
@@ -270,4 +280,150 @@ def test_aggregate_profile_is_candidate_true() -> None:
assert profile.is_candidate is True
assert profile.aggregate_kind == "candidate_dataclass"
assert profile.producers == ()
assert profile.consumers == ()
assert profile.consumers == ()
def test_pcg_init_empty() -> None:
"""ProducerConsumerGraph starts with empty edges and producers/consumers dicts."""
pcg = ProducerConsumerGraph()
assert pcg.edges == {}
assert pcg.producers == {}
assert pcg.consumers == {}
def test_pcg_add_producer_consumer() -> None:
"""add_producer + add_consumer add to the bipartite graph."""
pcg = ProducerConsumerGraph()
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="producer")
pcg.add_producer("Metadata", f)
pcg.add_consumer("Metadata", f)
assert "Metadata" in pcg.producers
assert "Metadata" in pcg.consumers
assert f in pcg.producers["Metadata"]
assert f in pcg.consumers["Metadata"]
def test_p1_pass_finds_producer_of_T() -> None:
"""P1 detects a function whose return annotation is a TypeAlias name (producer of T)."""
source = textwrap.dedent('''
def send_result() -> Metadata:
return {}
''')
tree = ast.parse(source)
producers = P1_pass(tree, file="synthetic.py")
assert ("send_result", "Metadata", "producer", "high") in producers
def test_p1_pass_finds_producer_of_Result_T() -> None:
"""P1 detects a function whose return annotation is Result[T] (producer of T)."""
source = textwrap.dedent('''
def fetch() -> Result[FileItems]:
return Result(data=[])
''')
tree = ast.parse(source)
producers = P1_pass(tree, file="synthetic.py")
assert ("fetch", "FileItems", "producer", "high") in producers
def test_p1_pass_skips_non_annotated_return() -> None:
"""P1 returns [] for functions without return annotations."""
source = textwrap.dedent('''
def unannotated():
return {}
''')
tree = ast.parse(source)
producers = P1_pass(tree, file="synthetic.py")
assert producers == []
def test_p2_pass_finds_consumer_of_T() -> None:
"""P2 detects a function whose parameter is a TypeAlias name (consumer of T)."""
source = textwrap.dedent('''
def process(entry: Metadata) -> None:
pass
''')
tree = ast.parse(source)
consumers = P2_pass(tree, file="synthetic.py")
assert ("process", "Metadata", "consumer", "high") in consumers
def test_p2_pass_finds_consumer_of_list_T() -> None:
"""P2 detects a function whose parameter is list[T] (consumer of T)."""
source = textwrap.dedent('''
def aggregate(items: list[FileItems]) -> None:
pass
''')
tree = ast.parse(source)
consumers = P2_pass(tree, file="synthetic.py")
assert ("aggregate", "FileItems", "consumer", "high") in consumers
def test_p2_pass_skips_untyped_parameter() -> None:
"""P2 returns [] for parameters without type annotations."""
source = textwrap.dedent('''
def process(entry) -> None:
pass
''')
tree = ast.parse(source)
consumers = P2_pass(tree, file="synthetic.py")
assert consumers == []
def test_p3_pass_finds_consumer_via_subscript() -> None:
"""P3 detects a function that reads entry['path']; without a type registry, returns the field name only."""
source = textwrap.dedent('''
def process(entry) -> None:
path = entry['path']
''')
tree = ast.parse(source)
accesses = P3_pass(tree, file="synthetic.py", type_registry={})
assert ("process", "path", "subscript", 1) in accesses
def test_p3_pass_finds_consumer_via_attribute() -> None:
"""P3 detects a function that reads entry.attr; returns (function, attr, kind, count)."""
source = textwrap.dedent('''
def process(entry) -> None:
path = entry.path
''')
tree = ast.parse(source)
accesses = P3_pass(tree, file="synthetic.py", type_registry={})
assert ("process", "path", "attribute", 1) in accesses
def test_p3_pass_counts_multiple_accesses() -> None:
"""P3 counts multiple accesses to the same key within a single function."""
source = textwrap.dedent('''
def process(entry) -> None:
a = entry['path']
b = entry['path']
c = entry['view_mode']
''')
tree = ast.parse(source)
accesses = P3_pass(tree, file="synthetic.py", type_registry={})
path_count = sum(c for fn, k, kind, c in accesses if fn == "process" and k == "path")
assert path_count == 2
def test_build_pcg_returns_result() -> None:
"""build_pcg returns Result[ProducerConsumerGraph] per error_handling.md."""
with tempfile.TemporaryDirectory() as tmp:
(Path(tmp) / "mod.py").write_text(textwrap.dedent('''
from src.type_aliases import Metadata
def produce() -> Metadata:
return {}
'''))
result = build_pcg(tmp)
assert isinstance(result, Result)
assert result.ok
def test_build_pcg_finds_producer_via_p1() -> None:
"""build_pcg correctly identifies a producer of Metadata via P1."""
with tempfile.TemporaryDirectory() as tmp:
(Path(tmp) / "mod.py").write_text(textwrap.dedent('''
from src.type_aliases import Metadata
def produce() -> Metadata:
return {}
'''))
pcg = build_pcg(tmp).data
assert "Metadata" in pcg.producers
def test_build_pcg_tolerates_syntax_errors() -> None:
"""build_pcg records syntax errors as ErrorInfo (boundary pattern); Result.ok is False."""
with tempfile.TemporaryDirectory() as tmp:
(Path(tmp) / "bad.py").write_text("def unclosed(:\n pass")
result = build_pcg(tmp)
assert not result.ok
assert len(result.errors) >= 1
assert isinstance(result.errors[0], ErrorInfo)
assert result.data is not None