Private
Public Access
0
0
Files
manual_slop/tests/test_code_path_audit.py
T
ed ef207cf684 feat(audit): complete Phase 1 data model (8 dataclasses, 12 new tests)
Tasks 1.3-1.10: AccessPatternEvidence, FrequencyEvidence,
ResultCoverage, TypeAliasCoverage, CrossAuditFinding,
CrossAuditFindings, DecompositionCost, OptimizationCandidate,
AggregateProfile. All frozen dataclasses per error_handling.md
Pattern 1 (immutability for cross-thread safety).

Phase 1 complete: 19 unit tests passing (5 enum tests + 14
dataclass tests). AggregateProfile is the central artifact with
14 required fields + 2 optional (mermaid, markdown).

Phase 2 (PCG - 3 AST passes + build_pcg()) next.
2026-06-22 01:10:57 -04:00

273 lines
9.6 KiB
Python

"""Tests for src.code_path_audit v2 - Phase 1 (data model)."""
from __future__ import annotations
import pytest
from src.code_path_audit import (
AggregateKind,
MemoryDim,
AccessPattern,
Frequency,
RecommendedDirection,
FunctionRef,
AccessPatternEvidence,
FrequencyEvidence,
ResultCoverage,
TypeAliasCoverage,
CrossAuditFinding,
CrossAuditFindings,
DecompositionCost,
OptimizationCandidate,
AggregateProfile,
)
def test_aggregate_kind_4_values() -> None:
"""AggregateKind is a Literal with 4 values: typealias, dataclass, candidate_dataclass, builtin."""
expected = {"typealias", "dataclass", "candidate_dataclass", "builtin"}
assert set(AggregateKind.__args__) == expected
def test_memory_dim_7_values() -> None:
"""MemoryDim is a Literal with 7 values: curation, discussion, rag, knowledge, config, control, unknown."""
expected = {"curation", "discussion", "rag", "knowledge", "config", "control", "unknown"}
assert set(MemoryDim.__args__) == expected
def test_access_pattern_5_values() -> None:
"""AccessPattern is a Literal with 5 values: whole_struct, field_by_field, hot_cold_split, bulk_batched, mixed."""
expected = {"whole_struct", "field_by_field", "hot_cold_split", "bulk_batched", "mixed"}
assert set(AccessPattern.__args__) == expected
def test_frequency_7_values() -> None:
"""Frequency is a Literal with 7 values: hot, per_turn, per_discussion, per_request, cold, init, unknown."""
expected = {"hot", "per_turn", "per_discussion", "per_request", "cold", "init", "unknown"}
assert set(Frequency.__args__) == expected
def test_recommended_direction_4_values() -> None:
"""RecommendedDirection is a Literal with 4 values: componentize, unify, hold, insufficient_data."""
expected = {"componentize", "unify", "hold", "insufficient_data"}
assert set(RecommendedDirection.__args__) == expected
def test_function_ref_4_fields() -> None:
"""FunctionRef has fqname, file, line, role (per spec)."""
ref = FunctionRef(
fqname="src.ai_client.AIClient.send_result",
file="src/ai_client.py",
line=100,
role="producer",
)
assert ref.fqname == "src.ai_client.AIClient.send_result"
assert ref.file == "src/ai_client.py"
assert ref.line == 100
assert ref.role == "producer"
def test_function_ref_frozen() -> None:
"""FunctionRef is frozen (immutability per error_handling.md)."""
ref = FunctionRef(
fqname="src.x.y",
file="src/x.py",
line=1,
role="consumer",
)
with pytest.raises((AttributeError, Exception)) as exc_info:
ref.fqname = "src.z.w"
assert "frozen" in str(exc_info.value).lower() or "cannot assign" in str(exc_info.value).lower()
def test_access_pattern_evidence_4_fields() -> None:
"""AccessPatternEvidence has function, pattern, field_accesses, confidence."""
ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer")
ev = AccessPatternEvidence(
function=ref,
pattern="field_by_field",
field_accesses={"path": 3, "view_mode": 2},
confidence="high",
)
assert ev.function is ref
assert ev.pattern == "field_by_field"
assert ev.field_accesses == {"path": 3, "view_mode": 2}
assert ev.confidence == "high"
def test_frequency_evidence_4_fields() -> None:
"""FrequencyEvidence has function, frequency, source, note (default '')."""
ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="both")
ev = FrequencyEvidence(
function=ref,
frequency="per_turn",
source="entry_point",
note="called per LLM turn",
)
assert ev.function is ref
assert ev.frequency == "per_turn"
assert ev.source == "entry_point"
assert ev.note == "called per LLM turn"
def test_frequency_evidence_default_note() -> None:
"""FrequencyEvidence.note defaults to ''."""
ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer")
ev = FrequencyEvidence(function=ref, frequency="cold", source="control_flow_position")
assert ev.note == ""
def test_result_coverage_5_fields() -> None:
"""ResultCoverage has total_producers, result_producers, total_consumers, result_consumers, summary."""
cov = ResultCoverage(
total_producers=12,
result_producers=5,
total_consumers=15,
result_consumers=8,
summary="5/12 producers return Result[T] (42%); 8/15 consumers branch on .errors (53%)",
)
assert cov.total_producers == 12
assert cov.result_producers == 5
assert cov.total_consumers == 15
assert cov.result_consumers == 8
assert "42%" in cov.summary
assert "53%" in cov.summary
def test_type_alias_coverage_4_fields() -> None:
"""TypeAliasCoverage has total_sites, typed_sites, untyped_sites, summary."""
cov = TypeAliasCoverage(
total_sites=45,
typed_sites=38,
untyped_sites=7,
summary="45 total sites; 38 typed (84%); 7 untyped (16%)",
)
assert cov.total_sites == 45
assert cov.typed_sites == 38
assert cov.untyped_sites == 7
assert "84%" in cov.summary
assert "16%" in cov.summary
def test_cross_audit_finding_5_fields() -> None:
"""CrossAuditFinding has audit_script, site_count, example_file, example_line, note (default '')."""
finding = CrossAuditFinding(
audit_script="audit_weak_types",
site_count=12,
example_file="src/ai_client.py",
example_line=100,
note="12 weak-type sites in producer+consumer functions",
)
assert finding.audit_script == "audit_weak_types"
assert finding.site_count == 12
assert finding.example_file == "src/ai_client.py"
assert finding.example_line == 100
assert finding.note == "12 weak-type sites in producer+consumer functions"
def test_cross_audit_finding_default_note() -> None:
"""CrossAuditFinding.note defaults to ''."""
finding = CrossAuditFinding(
audit_script="audit_optional_in_3_files",
site_count=0,
example_file="",
example_line=0,
)
assert finding.note == ""
def test_cross_audit_findings_5_audit_scripts() -> None:
"""CrossAuditFindings has 5 audit-script fields, each a tuple of CrossAuditFinding."""
findings = CrossAuditFindings(
weak_types=(),
exception_handling=(),
optional_in_baseline=(),
config_io_ownership=(),
import_graph=(),
)
assert findings.weak_types == ()
assert findings.exception_handling == ()
assert findings.optional_in_baseline == ()
assert findings.config_io_ownership == ()
assert findings.import_graph == ()
def test_decomposition_cost_8_fields() -> None:
"""DecompositionCost has 8 fields per spec."""
cost = DecompositionCost(
current_cost_estimate=1500,
componentize_savings=450,
unify_savings=0,
recommended_direction="hold",
recommended_rationale="whole_struct access on a frozen dataclass; current shape is correct",
batch_size=None,
struct_field_count=8,
struct_frozen=True,
)
assert cost.current_cost_estimate == 1500
assert cost.componentize_savings == 450
assert cost.unify_savings == 0
assert cost.recommended_direction == "hold"
assert "frozen" in cost.recommended_rationale
assert cost.batch_size is None
assert cost.struct_field_count == 8
assert cost.struct_frozen is True
def test_optimization_candidate_7_fields() -> None:
"""OptimizationCandidate has 7 fields per spec."""
cand = OptimizationCandidate(
candidate="Migrate 7 producers of Metadata to Result[Metadata]",
direction="componentize",
affected_files=("src/ai_client.py", "src/app_controller.py", "src/history.py"),
estimated_savings_us=500,
effort="small",
priority="high",
cross_ref="docs/reports/EXCEPTION_HANDLING_AUDIT_20260616.md",
)
assert "Migrate" in cand.candidate
assert cand.direction == "componentize"
assert len(cand.affected_files) == 3
assert cand.estimated_savings_us == 500
assert cand.effort == "small"
assert cand.priority == "high"
assert "EXCEPTION_HANDLING_AUDIT" in cand.cross_ref
def test_aggregate_profile_14_fields() -> None:
"""AggregateProfile has 14 top-level fields (per spec section 7.1)."""
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="producer")
profile = AggregateProfile(
name="Metadata",
aggregate_kind="typealias",
memory_dim="discussion",
producers=(f,),
consumers=(f,),
access_pattern="field_by_field",
access_pattern_evidence=(AccessPatternEvidence(
function=f, pattern="field_by_field", field_accesses={"role": 3}, confidence="high"
),),
frequency="per_turn",
frequency_evidence=(FrequencyEvidence(
function=f, frequency="per_turn", source="entry_point", note="per LLM turn"
),),
result_coverage=ResultCoverage(0, 0, 0, 0, ""),
type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""),
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
decomposition_cost=DecompositionCost(0, 0, 0, "hold", "no data", None, 0, False),
optimization_candidates=(),
is_candidate=False,
)
assert profile.name == "Metadata"
assert profile.aggregate_kind == "typealias"
assert profile.memory_dim == "discussion"
assert len(profile.producers) == 1
assert len(profile.consumers) == 1
assert profile.access_pattern == "field_by_field"
assert len(profile.access_pattern_evidence) == 1
assert profile.frequency == "per_turn"
assert len(profile.frequency_evidence) == 1
assert profile.is_candidate is False
def test_aggregate_profile_is_candidate_true() -> None:
"""AggregateProfile.is_candidate=True for the 3 candidate aggregates."""
profile = AggregateProfile(
name="ChatMessage",
aggregate_kind="candidate_dataclass",
memory_dim="discussion",
producers=(),
consumers=(),
access_pattern="mixed",
access_pattern_evidence=(),
frequency="unknown",
frequency_evidence=(),
result_coverage=ResultCoverage(0, 0, 0, 0, ""),
type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""),
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
decomposition_cost=DecompositionCost(0, 0, 0, "insufficient_data", "candidate", None, 0, False),
optimization_candidates=(),
is_candidate=True,
)
assert profile.is_candidate is True
assert profile.aggregate_kind == "candidate_dataclass"
assert profile.producers == ()
assert profile.consumers == ()