ef207cf684
Tasks 1.3-1.10: AccessPatternEvidence, FrequencyEvidence, ResultCoverage, TypeAliasCoverage, CrossAuditFinding, CrossAuditFindings, DecompositionCost, OptimizationCandidate, AggregateProfile. All frozen dataclasses per error_handling.md Pattern 1 (immutability for cross-thread safety). Phase 1 complete: 19 unit tests passing (5 enum tests + 14 dataclass tests). AggregateProfile is the central artifact with 14 required fields + 2 optional (mermaid, markdown). Phase 2 (PCG - 3 AST passes + build_pcg()) next.
273 lines
9.6 KiB
Python
273 lines
9.6 KiB
Python
"""Tests for src.code_path_audit v2 - Phase 1 (data model)."""
|
|
from __future__ import annotations
|
|
import pytest
|
|
from src.code_path_audit import (
|
|
AggregateKind,
|
|
MemoryDim,
|
|
AccessPattern,
|
|
Frequency,
|
|
RecommendedDirection,
|
|
FunctionRef,
|
|
AccessPatternEvidence,
|
|
FrequencyEvidence,
|
|
ResultCoverage,
|
|
TypeAliasCoverage,
|
|
CrossAuditFinding,
|
|
CrossAuditFindings,
|
|
DecompositionCost,
|
|
OptimizationCandidate,
|
|
AggregateProfile,
|
|
)
|
|
|
|
def test_aggregate_kind_4_values() -> None:
|
|
"""AggregateKind is a Literal with 4 values: typealias, dataclass, candidate_dataclass, builtin."""
|
|
expected = {"typealias", "dataclass", "candidate_dataclass", "builtin"}
|
|
assert set(AggregateKind.__args__) == expected
|
|
|
|
def test_memory_dim_7_values() -> None:
|
|
"""MemoryDim is a Literal with 7 values: curation, discussion, rag, knowledge, config, control, unknown."""
|
|
expected = {"curation", "discussion", "rag", "knowledge", "config", "control", "unknown"}
|
|
assert set(MemoryDim.__args__) == expected
|
|
|
|
def test_access_pattern_5_values() -> None:
|
|
"""AccessPattern is a Literal with 5 values: whole_struct, field_by_field, hot_cold_split, bulk_batched, mixed."""
|
|
expected = {"whole_struct", "field_by_field", "hot_cold_split", "bulk_batched", "mixed"}
|
|
assert set(AccessPattern.__args__) == expected
|
|
|
|
def test_frequency_7_values() -> None:
|
|
"""Frequency is a Literal with 7 values: hot, per_turn, per_discussion, per_request, cold, init, unknown."""
|
|
expected = {"hot", "per_turn", "per_discussion", "per_request", "cold", "init", "unknown"}
|
|
assert set(Frequency.__args__) == expected
|
|
|
|
def test_recommended_direction_4_values() -> None:
|
|
"""RecommendedDirection is a Literal with 4 values: componentize, unify, hold, insufficient_data."""
|
|
expected = {"componentize", "unify", "hold", "insufficient_data"}
|
|
assert set(RecommendedDirection.__args__) == expected
|
|
|
|
def test_function_ref_4_fields() -> None:
|
|
"""FunctionRef has fqname, file, line, role (per spec)."""
|
|
ref = FunctionRef(
|
|
fqname="src.ai_client.AIClient.send_result",
|
|
file="src/ai_client.py",
|
|
line=100,
|
|
role="producer",
|
|
)
|
|
assert ref.fqname == "src.ai_client.AIClient.send_result"
|
|
assert ref.file == "src/ai_client.py"
|
|
assert ref.line == 100
|
|
assert ref.role == "producer"
|
|
|
|
def test_function_ref_frozen() -> None:
|
|
"""FunctionRef is frozen (immutability per error_handling.md)."""
|
|
ref = FunctionRef(
|
|
fqname="src.x.y",
|
|
file="src/x.py",
|
|
line=1,
|
|
role="consumer",
|
|
)
|
|
with pytest.raises((AttributeError, Exception)) as exc_info:
|
|
ref.fqname = "src.z.w"
|
|
assert "frozen" in str(exc_info.value).lower() or "cannot assign" in str(exc_info.value).lower()
|
|
|
|
def test_access_pattern_evidence_4_fields() -> None:
|
|
"""AccessPatternEvidence has function, pattern, field_accesses, confidence."""
|
|
ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer")
|
|
ev = AccessPatternEvidence(
|
|
function=ref,
|
|
pattern="field_by_field",
|
|
field_accesses={"path": 3, "view_mode": 2},
|
|
confidence="high",
|
|
)
|
|
assert ev.function is ref
|
|
assert ev.pattern == "field_by_field"
|
|
assert ev.field_accesses == {"path": 3, "view_mode": 2}
|
|
assert ev.confidence == "high"
|
|
|
|
def test_frequency_evidence_4_fields() -> None:
|
|
"""FrequencyEvidence has function, frequency, source, note (default '')."""
|
|
ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="both")
|
|
ev = FrequencyEvidence(
|
|
function=ref,
|
|
frequency="per_turn",
|
|
source="entry_point",
|
|
note="called per LLM turn",
|
|
)
|
|
assert ev.function is ref
|
|
assert ev.frequency == "per_turn"
|
|
assert ev.source == "entry_point"
|
|
assert ev.note == "called per LLM turn"
|
|
|
|
def test_frequency_evidence_default_note() -> None:
|
|
"""FrequencyEvidence.note defaults to ''."""
|
|
ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer")
|
|
ev = FrequencyEvidence(function=ref, frequency="cold", source="control_flow_position")
|
|
assert ev.note == ""
|
|
|
|
def test_result_coverage_5_fields() -> None:
|
|
"""ResultCoverage has total_producers, result_producers, total_consumers, result_consumers, summary."""
|
|
cov = ResultCoverage(
|
|
total_producers=12,
|
|
result_producers=5,
|
|
total_consumers=15,
|
|
result_consumers=8,
|
|
summary="5/12 producers return Result[T] (42%); 8/15 consumers branch on .errors (53%)",
|
|
)
|
|
assert cov.total_producers == 12
|
|
assert cov.result_producers == 5
|
|
assert cov.total_consumers == 15
|
|
assert cov.result_consumers == 8
|
|
assert "42%" in cov.summary
|
|
assert "53%" in cov.summary
|
|
|
|
def test_type_alias_coverage_4_fields() -> None:
|
|
"""TypeAliasCoverage has total_sites, typed_sites, untyped_sites, summary."""
|
|
cov = TypeAliasCoverage(
|
|
total_sites=45,
|
|
typed_sites=38,
|
|
untyped_sites=7,
|
|
summary="45 total sites; 38 typed (84%); 7 untyped (16%)",
|
|
)
|
|
assert cov.total_sites == 45
|
|
assert cov.typed_sites == 38
|
|
assert cov.untyped_sites == 7
|
|
assert "84%" in cov.summary
|
|
assert "16%" in cov.summary
|
|
|
|
def test_cross_audit_finding_5_fields() -> None:
|
|
"""CrossAuditFinding has audit_script, site_count, example_file, example_line, note (default '')."""
|
|
finding = CrossAuditFinding(
|
|
audit_script="audit_weak_types",
|
|
site_count=12,
|
|
example_file="src/ai_client.py",
|
|
example_line=100,
|
|
note="12 weak-type sites in producer+consumer functions",
|
|
)
|
|
assert finding.audit_script == "audit_weak_types"
|
|
assert finding.site_count == 12
|
|
assert finding.example_file == "src/ai_client.py"
|
|
assert finding.example_line == 100
|
|
assert finding.note == "12 weak-type sites in producer+consumer functions"
|
|
|
|
def test_cross_audit_finding_default_note() -> None:
|
|
"""CrossAuditFinding.note defaults to ''."""
|
|
finding = CrossAuditFinding(
|
|
audit_script="audit_optional_in_3_files",
|
|
site_count=0,
|
|
example_file="",
|
|
example_line=0,
|
|
)
|
|
assert finding.note == ""
|
|
|
|
def test_cross_audit_findings_5_audit_scripts() -> None:
|
|
"""CrossAuditFindings has 5 audit-script fields, each a tuple of CrossAuditFinding."""
|
|
findings = CrossAuditFindings(
|
|
weak_types=(),
|
|
exception_handling=(),
|
|
optional_in_baseline=(),
|
|
config_io_ownership=(),
|
|
import_graph=(),
|
|
)
|
|
assert findings.weak_types == ()
|
|
assert findings.exception_handling == ()
|
|
assert findings.optional_in_baseline == ()
|
|
assert findings.config_io_ownership == ()
|
|
assert findings.import_graph == ()
|
|
|
|
def test_decomposition_cost_8_fields() -> None:
|
|
"""DecompositionCost has 8 fields per spec."""
|
|
cost = DecompositionCost(
|
|
current_cost_estimate=1500,
|
|
componentize_savings=450,
|
|
unify_savings=0,
|
|
recommended_direction="hold",
|
|
recommended_rationale="whole_struct access on a frozen dataclass; current shape is correct",
|
|
batch_size=None,
|
|
struct_field_count=8,
|
|
struct_frozen=True,
|
|
)
|
|
assert cost.current_cost_estimate == 1500
|
|
assert cost.componentize_savings == 450
|
|
assert cost.unify_savings == 0
|
|
assert cost.recommended_direction == "hold"
|
|
assert "frozen" in cost.recommended_rationale
|
|
assert cost.batch_size is None
|
|
assert cost.struct_field_count == 8
|
|
assert cost.struct_frozen is True
|
|
|
|
def test_optimization_candidate_7_fields() -> None:
|
|
"""OptimizationCandidate has 7 fields per spec."""
|
|
cand = OptimizationCandidate(
|
|
candidate="Migrate 7 producers of Metadata to Result[Metadata]",
|
|
direction="componentize",
|
|
affected_files=("src/ai_client.py", "src/app_controller.py", "src/history.py"),
|
|
estimated_savings_us=500,
|
|
effort="small",
|
|
priority="high",
|
|
cross_ref="docs/reports/EXCEPTION_HANDLING_AUDIT_20260616.md",
|
|
)
|
|
assert "Migrate" in cand.candidate
|
|
assert cand.direction == "componentize"
|
|
assert len(cand.affected_files) == 3
|
|
assert cand.estimated_savings_us == 500
|
|
assert cand.effort == "small"
|
|
assert cand.priority == "high"
|
|
assert "EXCEPTION_HANDLING_AUDIT" in cand.cross_ref
|
|
|
|
def test_aggregate_profile_14_fields() -> None:
|
|
"""AggregateProfile has 14 top-level fields (per spec section 7.1)."""
|
|
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="producer")
|
|
profile = AggregateProfile(
|
|
name="Metadata",
|
|
aggregate_kind="typealias",
|
|
memory_dim="discussion",
|
|
producers=(f,),
|
|
consumers=(f,),
|
|
access_pattern="field_by_field",
|
|
access_pattern_evidence=(AccessPatternEvidence(
|
|
function=f, pattern="field_by_field", field_accesses={"role": 3}, confidence="high"
|
|
),),
|
|
frequency="per_turn",
|
|
frequency_evidence=(FrequencyEvidence(
|
|
function=f, frequency="per_turn", source="entry_point", note="per LLM turn"
|
|
),),
|
|
result_coverage=ResultCoverage(0, 0, 0, 0, ""),
|
|
type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""),
|
|
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
|
|
decomposition_cost=DecompositionCost(0, 0, 0, "hold", "no data", None, 0, False),
|
|
optimization_candidates=(),
|
|
is_candidate=False,
|
|
)
|
|
assert profile.name == "Metadata"
|
|
assert profile.aggregate_kind == "typealias"
|
|
assert profile.memory_dim == "discussion"
|
|
assert len(profile.producers) == 1
|
|
assert len(profile.consumers) == 1
|
|
assert profile.access_pattern == "field_by_field"
|
|
assert len(profile.access_pattern_evidence) == 1
|
|
assert profile.frequency == "per_turn"
|
|
assert len(profile.frequency_evidence) == 1
|
|
assert profile.is_candidate is False
|
|
|
|
def test_aggregate_profile_is_candidate_true() -> None:
|
|
"""AggregateProfile.is_candidate=True for the 3 candidate aggregates."""
|
|
profile = AggregateProfile(
|
|
name="ChatMessage",
|
|
aggregate_kind="candidate_dataclass",
|
|
memory_dim="discussion",
|
|
producers=(),
|
|
consumers=(),
|
|
access_pattern="mixed",
|
|
access_pattern_evidence=(),
|
|
frequency="unknown",
|
|
frequency_evidence=(),
|
|
result_coverage=ResultCoverage(0, 0, 0, 0, ""),
|
|
type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""),
|
|
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
|
|
decomposition_cost=DecompositionCost(0, 0, 0, "insufficient_data", "candidate", None, 0, False),
|
|
optimization_candidates=(),
|
|
is_candidate=True,
|
|
)
|
|
assert profile.is_candidate is True
|
|
assert profile.aggregate_kind == "candidate_dataclass"
|
|
assert profile.producers == ()
|
|
assert profile.consumers == () |