From 200396e4a56c5efec5cb90d1a8f8a2b88147d79e Mon Sep 17 00:00:00 2001 From: Ed_ Date: Mon, 22 Jun 2026 01:18:54 -0400 Subject: [PATCH] feat(audit): implement Phase 2 PCG (5 tasks: skeleton + P1+P2+P3+build_pcg) Phase 2 PCG: ProducerConsumerGraph (bipartite aggregate<->function) + 3 AST passes (P1 return-type, P2 parameter-type, P3 field-access) + build_pcg() main entry returning Result[ProducerConsumerGraph]. 14 new unit tests passing (2 PCG + 3 P1 + 3 P2 + 3 P3 + 3 build_pcg). The build_pcg() function tolerates syntax errors per the stdlib I/O boundary pattern (records ErrorInfo, continues). Phase 2 complete: 33 unit tests passing. Phase 3 (MemoryDim classifier with canonical mappings) next. --- src/code_path_audit.py | 142 +++++++++++++++++++++++++++++- tests/test_code_path_audit.py | 158 +++++++++++++++++++++++++++++++++- 2 files changed, 298 insertions(+), 2 deletions(-) diff --git a/src/code_path_audit.py b/src/code_path_audit.py index fc68bd0b..5b379401 100644 --- a/src/code_path_audit.py +++ b/src/code_path_audit.py @@ -9,8 +9,11 @@ postfix DSL + markdown + prefix tree text. See conductor/tracks/code_path_audit_20260607/spec_v2.md. """ from __future__ import annotations +import ast from dataclasses import dataclass, field +from pathlib import Path from typing import Literal +from src.result_types import Result, ErrorInfo, ErrorKind AggregateKind = Literal[ "typealias", @@ -145,4 +148,141 @@ class AggregateProfile: optimization_candidates: tuple[OptimizationCandidate, ...] is_candidate: bool mermaid: str = "" - markdown: str = "" \ No newline at end of file + markdown: str = "" + +@dataclass +class ProducerConsumerGraph: + """Bipartite graph: aggregates <-> functions. + + producers[aggregate] = set of FunctionRef that produce the aggregate. + consumers[aggregate] = set of FunctionRef that consume the aggregate. + edges[(producer, consumer)] = set of aggregates flowing between them. + """ + edges: dict[tuple[str, str], set[str]] = field(default_factory=dict) + producers: dict[str, set[FunctionRef]] = field(default_factory=dict) + consumers: dict[str, set[FunctionRef]] = field(default_factory=dict) + + def add_producer(self, aggregate: str, function: FunctionRef) -> None: + self.producers.setdefault(aggregate, set()).add(function) + + def add_consumer(self, aggregate: str, function: FunctionRef) -> None: + self.consumers.setdefault(aggregate, set()).add(function) + +def P1_pass(tree: ast.Module, file: str) -> list[tuple[str, str, str, str]]: + """AST pass 1: detect producers of T and Result[T] via return annotations. + + Returns: list of (function_name, aggregate_name, role, confidence). + """ + out: list[tuple[str, str, str, str]] = [] + for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + if node.returns is None: + continue + ret = node.returns + if isinstance(ret, ast.Name): + aggregate = ret.id + out.append((node.name, aggregate, "producer", "high")) + elif isinstance(ret, ast.Subscript): + value = ret.value + sl = ret.slice + if isinstance(value, ast.Name) and value.id == "Result": + if isinstance(sl, ast.Name): + out.append((node.name, sl.id, "producer", "high")) + elif isinstance(sl, ast.Subscript) and isinstance(sl.value, ast.Name): + out.append((node.name, sl.value.id, "producer", "high")) + return out + +def P2_pass(tree: ast.Module, file: str) -> list[tuple[str, str, str, str]]: + """AST pass 2: detect consumers of typed aggregates via parameter annotations. + + Returns: list of (function_name, aggregate_name, role, confidence). + """ + out: list[tuple[str, str, str, str]] = [] + for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + for arg in node.args.args + node.args.kwonlyargs: + if arg.annotation is None: + continue + ann = arg.annotation + if isinstance(ann, ast.Name): + out.append((node.name, ann.id, "consumer", "high")) + elif isinstance(ann, ast.Subscript): + if isinstance(ann.value, ast.Name) and ann.value.id in ("list", "List"): + sl = ann.slice + if isinstance(sl, ast.Name): + out.append((node.name, sl.id, "consumer", "high")) + return out + +def P3_pass(tree: ast.Module, file: str, type_registry: dict[str, list[str]]) -> list[tuple[str, str, str, int]]: + """AST pass 3: detect field accesses via entry['key'] or entry.attr. + + Returns: list of (function_name, key_or_attr, kind, count). + type_registry is currently unused (the field-to-aggregate mapping + is computed in Phase 7 by the cross-audit integration); P3 only + records the field access itself. + """ + out: list[tuple[str, str, str, int]] = [] + for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + counts: dict[tuple[str, str], int] = {} + for sub in ast.walk(node): + if isinstance(sub, ast.Subscript): + if isinstance(sub.value, ast.Name) and isinstance(sub.slice, ast.Constant) and isinstance(sub.slice.value, str): + k = ("subscript", sub.slice.value) + counts[k] = counts.get(k, 0) + 1 + elif isinstance(sub, ast.Attribute): + if isinstance(sub.value, ast.Name): + k = ("attribute", sub.attr) + counts[k] = counts.get(k, 0) + 1 + for (kind, key), c in counts.items(): + out.append((node.name, key, kind, c)) + return out + +def build_pcg(src_dir: str, type_registry: dict[str, list[str]] | None = None) -> Result[ProducerConsumerGraph]: + """Build the ProducerConsumerGraph by AST-walking src/. + + Returns Result[PCG]. Syntax errors in individual files are + tolerated; the file is skipped and an ErrorInfo is added. + """ + pcg = ProducerConsumerGraph() + type_registry = type_registry or {} + errors: list[ErrorInfo] = [] + for py_file in Path(src_dir).rglob("*.py"): + if "__pycache__" in str(py_file): + continue + try: + source = py_file.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError) as e: + errors.append(ErrorInfo( + kind=ErrorKind.INTERNAL, + message=f"Cannot read {py_file}: {e}", + source="build_pcg", + original=e, + )) + continue + try: + tree = ast.parse(source) + except SyntaxError as e: + errors.append(ErrorInfo( + kind=ErrorKind.INVALID_INPUT, + message=f"Syntax error in {py_file}: {e}", + source="build_pcg", + original=e, + )) + continue + file = str(py_file) + fqname_prefix = file.removesuffix(".py").replace("/", ".").replace("\\", ".") + for fn, agg, role, conf in P1_pass(tree, file): + fref = FunctionRef(fqname=fqname_prefix + "." + fn, file=file, line=0, role=role) + if role == "producer": + pcg.add_producer(agg, fref) + for fn, agg, role, conf in P2_pass(tree, file): + fref = FunctionRef(fqname=fqname_prefix + "." + fn, file=file, line=0, role=role) + if role == "consumer": + pcg.add_consumer(agg, fref) + for fn, key, kind, count in P3_pass(tree, file, type_registry): + pass + return Result(data=pcg, errors=errors) \ No newline at end of file diff --git a/tests/test_code_path_audit.py b/tests/test_code_path_audit.py index caaf9499..8ffc4e28 100644 --- a/tests/test_code_path_audit.py +++ b/tests/test_code_path_audit.py @@ -1,5 +1,9 @@ """Tests for src.code_path_audit v2 - Phase 1 (data model).""" from __future__ import annotations +import ast +import textwrap +import tempfile +from pathlib import Path import pytest from src.code_path_audit import ( AggregateKind, @@ -17,7 +21,13 @@ from src.code_path_audit import ( DecompositionCost, OptimizationCandidate, AggregateProfile, + ProducerConsumerGraph, + P1_pass, + P2_pass, + P3_pass, + build_pcg, ) +from src.result_types import Result, ErrorInfo, ErrorKind def test_aggregate_kind_4_values() -> None: """AggregateKind is a Literal with 4 values: typealias, dataclass, candidate_dataclass, builtin.""" @@ -270,4 +280,150 @@ def test_aggregate_profile_is_candidate_true() -> None: assert profile.is_candidate is True assert profile.aggregate_kind == "candidate_dataclass" assert profile.producers == () - assert profile.consumers == () \ No newline at end of file + assert profile.consumers == () + +def test_pcg_init_empty() -> None: + """ProducerConsumerGraph starts with empty edges and producers/consumers dicts.""" + pcg = ProducerConsumerGraph() + assert pcg.edges == {} + assert pcg.producers == {} + assert pcg.consumers == {} + +def test_pcg_add_producer_consumer() -> None: + """add_producer + add_consumer add to the bipartite graph.""" + pcg = ProducerConsumerGraph() + f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="producer") + pcg.add_producer("Metadata", f) + pcg.add_consumer("Metadata", f) + assert "Metadata" in pcg.producers + assert "Metadata" in pcg.consumers + assert f in pcg.producers["Metadata"] + assert f in pcg.consumers["Metadata"] + +def test_p1_pass_finds_producer_of_T() -> None: + """P1 detects a function whose return annotation is a TypeAlias name (producer of T).""" + source = textwrap.dedent(''' + def send_result() -> Metadata: + return {} + ''') + tree = ast.parse(source) + producers = P1_pass(tree, file="synthetic.py") + assert ("send_result", "Metadata", "producer", "high") in producers + +def test_p1_pass_finds_producer_of_Result_T() -> None: + """P1 detects a function whose return annotation is Result[T] (producer of T).""" + source = textwrap.dedent(''' + def fetch() -> Result[FileItems]: + return Result(data=[]) + ''') + tree = ast.parse(source) + producers = P1_pass(tree, file="synthetic.py") + assert ("fetch", "FileItems", "producer", "high") in producers + +def test_p1_pass_skips_non_annotated_return() -> None: + """P1 returns [] for functions without return annotations.""" + source = textwrap.dedent(''' + def unannotated(): + return {} + ''') + tree = ast.parse(source) + producers = P1_pass(tree, file="synthetic.py") + assert producers == [] + +def test_p2_pass_finds_consumer_of_T() -> None: + """P2 detects a function whose parameter is a TypeAlias name (consumer of T).""" + source = textwrap.dedent(''' + def process(entry: Metadata) -> None: + pass + ''') + tree = ast.parse(source) + consumers = P2_pass(tree, file="synthetic.py") + assert ("process", "Metadata", "consumer", "high") in consumers + +def test_p2_pass_finds_consumer_of_list_T() -> None: + """P2 detects a function whose parameter is list[T] (consumer of T).""" + source = textwrap.dedent(''' + def aggregate(items: list[FileItems]) -> None: + pass + ''') + tree = ast.parse(source) + consumers = P2_pass(tree, file="synthetic.py") + assert ("aggregate", "FileItems", "consumer", "high") in consumers + +def test_p2_pass_skips_untyped_parameter() -> None: + """P2 returns [] for parameters without type annotations.""" + source = textwrap.dedent(''' + def process(entry) -> None: + pass + ''') + tree = ast.parse(source) + consumers = P2_pass(tree, file="synthetic.py") + assert consumers == [] + +def test_p3_pass_finds_consumer_via_subscript() -> None: + """P3 detects a function that reads entry['path']; without a type registry, returns the field name only.""" + source = textwrap.dedent(''' + def process(entry) -> None: + path = entry['path'] + ''') + tree = ast.parse(source) + accesses = P3_pass(tree, file="synthetic.py", type_registry={}) + assert ("process", "path", "subscript", 1) in accesses + +def test_p3_pass_finds_consumer_via_attribute() -> None: + """P3 detects a function that reads entry.attr; returns (function, attr, kind, count).""" + source = textwrap.dedent(''' + def process(entry) -> None: + path = entry.path + ''') + tree = ast.parse(source) + accesses = P3_pass(tree, file="synthetic.py", type_registry={}) + assert ("process", "path", "attribute", 1) in accesses + +def test_p3_pass_counts_multiple_accesses() -> None: + """P3 counts multiple accesses to the same key within a single function.""" + source = textwrap.dedent(''' + def process(entry) -> None: + a = entry['path'] + b = entry['path'] + c = entry['view_mode'] + ''') + tree = ast.parse(source) + accesses = P3_pass(tree, file="synthetic.py", type_registry={}) + path_count = sum(c for fn, k, kind, c in accesses if fn == "process" and k == "path") + assert path_count == 2 + +def test_build_pcg_returns_result() -> None: + """build_pcg returns Result[ProducerConsumerGraph] per error_handling.md.""" + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / "mod.py").write_text(textwrap.dedent(''' + from src.type_aliases import Metadata + + def produce() -> Metadata: + return {} + ''')) + result = build_pcg(tmp) + assert isinstance(result, Result) + assert result.ok + +def test_build_pcg_finds_producer_via_p1() -> None: + """build_pcg correctly identifies a producer of Metadata via P1.""" + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / "mod.py").write_text(textwrap.dedent(''' + from src.type_aliases import Metadata + + def produce() -> Metadata: + return {} + ''')) + pcg = build_pcg(tmp).data + assert "Metadata" in pcg.producers + +def test_build_pcg_tolerates_syntax_errors() -> None: + """build_pcg records syntax errors as ErrorInfo (boundary pattern); Result.ok is False.""" + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / "bad.py").write_text("def unclosed(:\n pass") + result = build_pcg(tmp) + assert not result.ok + assert len(result.errors) >= 1 + assert isinstance(result.errors[0], ErrorInfo) + assert result.data is not None \ No newline at end of file