feat(audit): implement Phase 2 PCG (5 tasks: skeleton + P1+P2+P3+build_pcg)
Phase 2 PCG: ProducerConsumerGraph (bipartite aggregate<->function) + 3 AST passes (P1 return-type, P2 parameter-type, P3 field-access) + build_pcg() main entry returning Result[ProducerConsumerGraph]. 14 new unit tests passing (2 PCG + 3 P1 + 3 P2 + 3 P3 + 3 build_pcg). The build_pcg() function tolerates syntax errors per the stdlib I/O boundary pattern (records ErrorInfo, continues). Phase 2 complete: 33 unit tests passing. Phase 3 (MemoryDim classifier with canonical mappings) next.
This commit is contained in:
+141
-1
@@ -9,8 +9,11 @@ postfix DSL + markdown + prefix tree text. See
|
|||||||
conductor/tracks/code_path_audit_20260607/spec_v2.md.
|
conductor/tracks/code_path_audit_20260607/spec_v2.md.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
import ast
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
from typing import Literal
|
from typing import Literal
|
||||||
|
from src.result_types import Result, ErrorInfo, ErrorKind
|
||||||
|
|
||||||
AggregateKind = Literal[
|
AggregateKind = Literal[
|
||||||
"typealias",
|
"typealias",
|
||||||
@@ -145,4 +148,141 @@ class AggregateProfile:
|
|||||||
optimization_candidates: tuple[OptimizationCandidate, ...]
|
optimization_candidates: tuple[OptimizationCandidate, ...]
|
||||||
is_candidate: bool
|
is_candidate: bool
|
||||||
mermaid: str = ""
|
mermaid: str = ""
|
||||||
markdown: str = ""
|
markdown: str = ""
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ProducerConsumerGraph:
|
||||||
|
"""Bipartite graph: aggregates <-> functions.
|
||||||
|
|
||||||
|
producers[aggregate] = set of FunctionRef that produce the aggregate.
|
||||||
|
consumers[aggregate] = set of FunctionRef that consume the aggregate.
|
||||||
|
edges[(producer, consumer)] = set of aggregates flowing between them.
|
||||||
|
"""
|
||||||
|
edges: dict[tuple[str, str], set[str]] = field(default_factory=dict)
|
||||||
|
producers: dict[str, set[FunctionRef]] = field(default_factory=dict)
|
||||||
|
consumers: dict[str, set[FunctionRef]] = field(default_factory=dict)
|
||||||
|
|
||||||
|
def add_producer(self, aggregate: str, function: FunctionRef) -> None:
|
||||||
|
self.producers.setdefault(aggregate, set()).add(function)
|
||||||
|
|
||||||
|
def add_consumer(self, aggregate: str, function: FunctionRef) -> None:
|
||||||
|
self.consumers.setdefault(aggregate, set()).add(function)
|
||||||
|
|
||||||
|
def P1_pass(tree: ast.Module, file: str) -> list[tuple[str, str, str, str]]:
|
||||||
|
"""AST pass 1: detect producers of T and Result[T] via return annotations.
|
||||||
|
|
||||||
|
Returns: list of (function_name, aggregate_name, role, confidence).
|
||||||
|
"""
|
||||||
|
out: list[tuple[str, str, str, str]] = []
|
||||||
|
for node in ast.walk(tree):
|
||||||
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||||
|
continue
|
||||||
|
if node.returns is None:
|
||||||
|
continue
|
||||||
|
ret = node.returns
|
||||||
|
if isinstance(ret, ast.Name):
|
||||||
|
aggregate = ret.id
|
||||||
|
out.append((node.name, aggregate, "producer", "high"))
|
||||||
|
elif isinstance(ret, ast.Subscript):
|
||||||
|
value = ret.value
|
||||||
|
sl = ret.slice
|
||||||
|
if isinstance(value, ast.Name) and value.id == "Result":
|
||||||
|
if isinstance(sl, ast.Name):
|
||||||
|
out.append((node.name, sl.id, "producer", "high"))
|
||||||
|
elif isinstance(sl, ast.Subscript) and isinstance(sl.value, ast.Name):
|
||||||
|
out.append((node.name, sl.value.id, "producer", "high"))
|
||||||
|
return out
|
||||||
|
|
||||||
|
def P2_pass(tree: ast.Module, file: str) -> list[tuple[str, str, str, str]]:
|
||||||
|
"""AST pass 2: detect consumers of typed aggregates via parameter annotations.
|
||||||
|
|
||||||
|
Returns: list of (function_name, aggregate_name, role, confidence).
|
||||||
|
"""
|
||||||
|
out: list[tuple[str, str, str, str]] = []
|
||||||
|
for node in ast.walk(tree):
|
||||||
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||||
|
continue
|
||||||
|
for arg in node.args.args + node.args.kwonlyargs:
|
||||||
|
if arg.annotation is None:
|
||||||
|
continue
|
||||||
|
ann = arg.annotation
|
||||||
|
if isinstance(ann, ast.Name):
|
||||||
|
out.append((node.name, ann.id, "consumer", "high"))
|
||||||
|
elif isinstance(ann, ast.Subscript):
|
||||||
|
if isinstance(ann.value, ast.Name) and ann.value.id in ("list", "List"):
|
||||||
|
sl = ann.slice
|
||||||
|
if isinstance(sl, ast.Name):
|
||||||
|
out.append((node.name, sl.id, "consumer", "high"))
|
||||||
|
return out
|
||||||
|
|
||||||
|
def P3_pass(tree: ast.Module, file: str, type_registry: dict[str, list[str]]) -> list[tuple[str, str, str, int]]:
|
||||||
|
"""AST pass 3: detect field accesses via entry['key'] or entry.attr.
|
||||||
|
|
||||||
|
Returns: list of (function_name, key_or_attr, kind, count).
|
||||||
|
type_registry is currently unused (the field-to-aggregate mapping
|
||||||
|
is computed in Phase 7 by the cross-audit integration); P3 only
|
||||||
|
records the field access itself.
|
||||||
|
"""
|
||||||
|
out: list[tuple[str, str, str, int]] = []
|
||||||
|
for node in ast.walk(tree):
|
||||||
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||||
|
continue
|
||||||
|
counts: dict[tuple[str, str], int] = {}
|
||||||
|
for sub in ast.walk(node):
|
||||||
|
if isinstance(sub, ast.Subscript):
|
||||||
|
if isinstance(sub.value, ast.Name) and isinstance(sub.slice, ast.Constant) and isinstance(sub.slice.value, str):
|
||||||
|
k = ("subscript", sub.slice.value)
|
||||||
|
counts[k] = counts.get(k, 0) + 1
|
||||||
|
elif isinstance(sub, ast.Attribute):
|
||||||
|
if isinstance(sub.value, ast.Name):
|
||||||
|
k = ("attribute", sub.attr)
|
||||||
|
counts[k] = counts.get(k, 0) + 1
|
||||||
|
for (kind, key), c in counts.items():
|
||||||
|
out.append((node.name, key, kind, c))
|
||||||
|
return out
|
||||||
|
|
||||||
|
def build_pcg(src_dir: str, type_registry: dict[str, list[str]] | None = None) -> Result[ProducerConsumerGraph]:
|
||||||
|
"""Build the ProducerConsumerGraph by AST-walking src/.
|
||||||
|
|
||||||
|
Returns Result[PCG]. Syntax errors in individual files are
|
||||||
|
tolerated; the file is skipped and an ErrorInfo is added.
|
||||||
|
"""
|
||||||
|
pcg = ProducerConsumerGraph()
|
||||||
|
type_registry = type_registry or {}
|
||||||
|
errors: list[ErrorInfo] = []
|
||||||
|
for py_file in Path(src_dir).rglob("*.py"):
|
||||||
|
if "__pycache__" in str(py_file):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
source = py_file.read_text(encoding="utf-8")
|
||||||
|
except (OSError, UnicodeDecodeError) as e:
|
||||||
|
errors.append(ErrorInfo(
|
||||||
|
kind=ErrorKind.INTERNAL,
|
||||||
|
message=f"Cannot read {py_file}: {e}",
|
||||||
|
source="build_pcg",
|
||||||
|
original=e,
|
||||||
|
))
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
tree = ast.parse(source)
|
||||||
|
except SyntaxError as e:
|
||||||
|
errors.append(ErrorInfo(
|
||||||
|
kind=ErrorKind.INVALID_INPUT,
|
||||||
|
message=f"Syntax error in {py_file}: {e}",
|
||||||
|
source="build_pcg",
|
||||||
|
original=e,
|
||||||
|
))
|
||||||
|
continue
|
||||||
|
file = str(py_file)
|
||||||
|
fqname_prefix = file.removesuffix(".py").replace("/", ".").replace("\\", ".")
|
||||||
|
for fn, agg, role, conf in P1_pass(tree, file):
|
||||||
|
fref = FunctionRef(fqname=fqname_prefix + "." + fn, file=file, line=0, role=role)
|
||||||
|
if role == "producer":
|
||||||
|
pcg.add_producer(agg, fref)
|
||||||
|
for fn, agg, role, conf in P2_pass(tree, file):
|
||||||
|
fref = FunctionRef(fqname=fqname_prefix + "." + fn, file=file, line=0, role=role)
|
||||||
|
if role == "consumer":
|
||||||
|
pcg.add_consumer(agg, fref)
|
||||||
|
for fn, key, kind, count in P3_pass(tree, file, type_registry):
|
||||||
|
pass
|
||||||
|
return Result(data=pcg, errors=errors)
|
||||||
@@ -1,5 +1,9 @@
|
|||||||
"""Tests for src.code_path_audit v2 - Phase 1 (data model)."""
|
"""Tests for src.code_path_audit v2 - Phase 1 (data model)."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
import ast
|
||||||
|
import textwrap
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
import pytest
|
import pytest
|
||||||
from src.code_path_audit import (
|
from src.code_path_audit import (
|
||||||
AggregateKind,
|
AggregateKind,
|
||||||
@@ -17,7 +21,13 @@ from src.code_path_audit import (
|
|||||||
DecompositionCost,
|
DecompositionCost,
|
||||||
OptimizationCandidate,
|
OptimizationCandidate,
|
||||||
AggregateProfile,
|
AggregateProfile,
|
||||||
|
ProducerConsumerGraph,
|
||||||
|
P1_pass,
|
||||||
|
P2_pass,
|
||||||
|
P3_pass,
|
||||||
|
build_pcg,
|
||||||
)
|
)
|
||||||
|
from src.result_types import Result, ErrorInfo, ErrorKind
|
||||||
|
|
||||||
def test_aggregate_kind_4_values() -> None:
|
def test_aggregate_kind_4_values() -> None:
|
||||||
"""AggregateKind is a Literal with 4 values: typealias, dataclass, candidate_dataclass, builtin."""
|
"""AggregateKind is a Literal with 4 values: typealias, dataclass, candidate_dataclass, builtin."""
|
||||||
@@ -270,4 +280,150 @@ def test_aggregate_profile_is_candidate_true() -> None:
|
|||||||
assert profile.is_candidate is True
|
assert profile.is_candidate is True
|
||||||
assert profile.aggregate_kind == "candidate_dataclass"
|
assert profile.aggregate_kind == "candidate_dataclass"
|
||||||
assert profile.producers == ()
|
assert profile.producers == ()
|
||||||
assert profile.consumers == ()
|
assert profile.consumers == ()
|
||||||
|
|
||||||
|
def test_pcg_init_empty() -> None:
|
||||||
|
"""ProducerConsumerGraph starts with empty edges and producers/consumers dicts."""
|
||||||
|
pcg = ProducerConsumerGraph()
|
||||||
|
assert pcg.edges == {}
|
||||||
|
assert pcg.producers == {}
|
||||||
|
assert pcg.consumers == {}
|
||||||
|
|
||||||
|
def test_pcg_add_producer_consumer() -> None:
|
||||||
|
"""add_producer + add_consumer add to the bipartite graph."""
|
||||||
|
pcg = ProducerConsumerGraph()
|
||||||
|
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="producer")
|
||||||
|
pcg.add_producer("Metadata", f)
|
||||||
|
pcg.add_consumer("Metadata", f)
|
||||||
|
assert "Metadata" in pcg.producers
|
||||||
|
assert "Metadata" in pcg.consumers
|
||||||
|
assert f in pcg.producers["Metadata"]
|
||||||
|
assert f in pcg.consumers["Metadata"]
|
||||||
|
|
||||||
|
def test_p1_pass_finds_producer_of_T() -> None:
|
||||||
|
"""P1 detects a function whose return annotation is a TypeAlias name (producer of T)."""
|
||||||
|
source = textwrap.dedent('''
|
||||||
|
def send_result() -> Metadata:
|
||||||
|
return {}
|
||||||
|
''')
|
||||||
|
tree = ast.parse(source)
|
||||||
|
producers = P1_pass(tree, file="synthetic.py")
|
||||||
|
assert ("send_result", "Metadata", "producer", "high") in producers
|
||||||
|
|
||||||
|
def test_p1_pass_finds_producer_of_Result_T() -> None:
|
||||||
|
"""P1 detects a function whose return annotation is Result[T] (producer of T)."""
|
||||||
|
source = textwrap.dedent('''
|
||||||
|
def fetch() -> Result[FileItems]:
|
||||||
|
return Result(data=[])
|
||||||
|
''')
|
||||||
|
tree = ast.parse(source)
|
||||||
|
producers = P1_pass(tree, file="synthetic.py")
|
||||||
|
assert ("fetch", "FileItems", "producer", "high") in producers
|
||||||
|
|
||||||
|
def test_p1_pass_skips_non_annotated_return() -> None:
|
||||||
|
"""P1 returns [] for functions without return annotations."""
|
||||||
|
source = textwrap.dedent('''
|
||||||
|
def unannotated():
|
||||||
|
return {}
|
||||||
|
''')
|
||||||
|
tree = ast.parse(source)
|
||||||
|
producers = P1_pass(tree, file="synthetic.py")
|
||||||
|
assert producers == []
|
||||||
|
|
||||||
|
def test_p2_pass_finds_consumer_of_T() -> None:
|
||||||
|
"""P2 detects a function whose parameter is a TypeAlias name (consumer of T)."""
|
||||||
|
source = textwrap.dedent('''
|
||||||
|
def process(entry: Metadata) -> None:
|
||||||
|
pass
|
||||||
|
''')
|
||||||
|
tree = ast.parse(source)
|
||||||
|
consumers = P2_pass(tree, file="synthetic.py")
|
||||||
|
assert ("process", "Metadata", "consumer", "high") in consumers
|
||||||
|
|
||||||
|
def test_p2_pass_finds_consumer_of_list_T() -> None:
|
||||||
|
"""P2 detects a function whose parameter is list[T] (consumer of T)."""
|
||||||
|
source = textwrap.dedent('''
|
||||||
|
def aggregate(items: list[FileItems]) -> None:
|
||||||
|
pass
|
||||||
|
''')
|
||||||
|
tree = ast.parse(source)
|
||||||
|
consumers = P2_pass(tree, file="synthetic.py")
|
||||||
|
assert ("aggregate", "FileItems", "consumer", "high") in consumers
|
||||||
|
|
||||||
|
def test_p2_pass_skips_untyped_parameter() -> None:
|
||||||
|
"""P2 returns [] for parameters without type annotations."""
|
||||||
|
source = textwrap.dedent('''
|
||||||
|
def process(entry) -> None:
|
||||||
|
pass
|
||||||
|
''')
|
||||||
|
tree = ast.parse(source)
|
||||||
|
consumers = P2_pass(tree, file="synthetic.py")
|
||||||
|
assert consumers == []
|
||||||
|
|
||||||
|
def test_p3_pass_finds_consumer_via_subscript() -> None:
|
||||||
|
"""P3 detects a function that reads entry['path']; without a type registry, returns the field name only."""
|
||||||
|
source = textwrap.dedent('''
|
||||||
|
def process(entry) -> None:
|
||||||
|
path = entry['path']
|
||||||
|
''')
|
||||||
|
tree = ast.parse(source)
|
||||||
|
accesses = P3_pass(tree, file="synthetic.py", type_registry={})
|
||||||
|
assert ("process", "path", "subscript", 1) in accesses
|
||||||
|
|
||||||
|
def test_p3_pass_finds_consumer_via_attribute() -> None:
|
||||||
|
"""P3 detects a function that reads entry.attr; returns (function, attr, kind, count)."""
|
||||||
|
source = textwrap.dedent('''
|
||||||
|
def process(entry) -> None:
|
||||||
|
path = entry.path
|
||||||
|
''')
|
||||||
|
tree = ast.parse(source)
|
||||||
|
accesses = P3_pass(tree, file="synthetic.py", type_registry={})
|
||||||
|
assert ("process", "path", "attribute", 1) in accesses
|
||||||
|
|
||||||
|
def test_p3_pass_counts_multiple_accesses() -> None:
|
||||||
|
"""P3 counts multiple accesses to the same key within a single function."""
|
||||||
|
source = textwrap.dedent('''
|
||||||
|
def process(entry) -> None:
|
||||||
|
a = entry['path']
|
||||||
|
b = entry['path']
|
||||||
|
c = entry['view_mode']
|
||||||
|
''')
|
||||||
|
tree = ast.parse(source)
|
||||||
|
accesses = P3_pass(tree, file="synthetic.py", type_registry={})
|
||||||
|
path_count = sum(c for fn, k, kind, c in accesses if fn == "process" and k == "path")
|
||||||
|
assert path_count == 2
|
||||||
|
|
||||||
|
def test_build_pcg_returns_result() -> None:
|
||||||
|
"""build_pcg returns Result[ProducerConsumerGraph] per error_handling.md."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
(Path(tmp) / "mod.py").write_text(textwrap.dedent('''
|
||||||
|
from src.type_aliases import Metadata
|
||||||
|
|
||||||
|
def produce() -> Metadata:
|
||||||
|
return {}
|
||||||
|
'''))
|
||||||
|
result = build_pcg(tmp)
|
||||||
|
assert isinstance(result, Result)
|
||||||
|
assert result.ok
|
||||||
|
|
||||||
|
def test_build_pcg_finds_producer_via_p1() -> None:
|
||||||
|
"""build_pcg correctly identifies a producer of Metadata via P1."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
(Path(tmp) / "mod.py").write_text(textwrap.dedent('''
|
||||||
|
from src.type_aliases import Metadata
|
||||||
|
|
||||||
|
def produce() -> Metadata:
|
||||||
|
return {}
|
||||||
|
'''))
|
||||||
|
pcg = build_pcg(tmp).data
|
||||||
|
assert "Metadata" in pcg.producers
|
||||||
|
|
||||||
|
def test_build_pcg_tolerates_syntax_errors() -> None:
|
||||||
|
"""build_pcg records syntax errors as ErrorInfo (boundary pattern); Result.ok is False."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
(Path(tmp) / "bad.py").write_text("def unclosed(:\n pass")
|
||||||
|
result = build_pcg(tmp)
|
||||||
|
assert not result.ok
|
||||||
|
assert len(result.errors) >= 1
|
||||||
|
assert isinstance(result.errors[0], ErrorInfo)
|
||||||
|
assert result.data is not None
|
||||||
Reference in New Issue
Block a user