Private
Public Access
0
0
Files
manual_slop/tests/test_code_path_audit.py
T
ed 077149011b fix(audit): real line numbers + entry.get() field-access detection + Optional/dict/Union patterns
Three real bugs fixed:
1. FunctionRef always used line=0. Now passes node.lineno from AST.
2. P3_pass results were discarded with bare pass. Now stored in
   ProducerConsumerGraph.field_accesses.
3. Field-access detector only saw entry['key']; missed entry.get('key')
   which is the dominant pattern in this codebase. Now handles both.

Plus _extract_type_name() helper handles Optional[T], dict[str, T],
list[T], Result[T], Union[T, ...], and T | None (PEP 604) so P1/P2
catch more annotation patterns.

Real numbers (Metadata aggregate):
- producers: 77 -> 117
- consumers: 35 -> 66
- field-access sites: 130 -> 173
- line numbers: all real (line 1281, 1746, etc.)

AUDIT_REPORT.md grew 2009 -> 3140 lines with real evidence.
Total audit output: 5176 lines / 50 files (was 2415 / 49).

All 131 tests still passing.
2026-06-22 12:20:32 -04:00

841 lines
33 KiB
Python

"""Tests for src.code_path_audit v2 - Phase 1 (data model)."""
from __future__ import annotations
import ast
import textwrap
import tempfile
from pathlib import Path
from collections import Counter
import pytest
from src.code_path_audit import (
AggregateKind,
MemoryDim,
AccessPattern,
Frequency,
RecommendedDirection,
FunctionRef,
AccessPatternEvidence,
FrequencyEvidence,
ResultCoverage,
TypeAliasCoverage,
CrossAuditFinding,
CrossAuditFindings,
DecompositionCost,
OptimizationCandidate,
AggregateProfile,
ProducerConsumerGraph,
P1_pass,
P2_pass,
P3_pass,
build_pcg,
CANONICAL_MEMORY_DIM,
MEMORY_DIM_FILE_HEURISTIC,
load_memory_dim_overrides,
file_origin_memory_dim,
classify_memory_dim,
WHOLE_STRUCT_KEY_THRESHOLD,
FIELD_BY_FIELD_KEY_THRESHOLD,
MIXED_DOMINANCE_THRESHOLD,
AGGREGATE_LEVEL_DOMINANCE_THRESHOLD,
is_whole_struct_access,
is_field_by_field_access,
is_hot_cold_split,
is_bulk_batched_access,
dominant_pattern,
detect_access_pattern,
detect_frequency_from_entry_point,
load_frequency_overrides,
estimate_call_frequency,
MICROSECOND_BUDGET_PER_LLM_TURN,
BRANCH_DISPATCH_OVERHEAD_US,
ALLOCATION_OVERHEAD_US,
DEAD_FIELD_COST_PER_FIELD_US,
COMPONENTIZATION_INDIRECTION_US,
UNIFICATION_INDIRECTION_US,
per_call_cost_us,
FREQUENCY_MULTIPLIER,
current_total_us,
componentize_factor,
unify_factor,
recommended_direction,
generate_rationale,
compute_decomposition_cost,
)
from src.result_types import Result, ErrorInfo, ErrorKind
def test_aggregate_kind_4_values() -> None:
"""AggregateKind is a Literal with 4 values: typealias, dataclass, candidate_dataclass, builtin."""
expected = {"typealias", "dataclass", "candidate_dataclass", "builtin"}
assert set(AggregateKind.__args__) == expected
def test_memory_dim_7_values() -> None:
"""MemoryDim is a Literal with 7 values: curation, discussion, rag, knowledge, config, control, unknown."""
expected = {"curation", "discussion", "rag", "knowledge", "config", "control", "unknown"}
assert set(MemoryDim.__args__) == expected
def test_access_pattern_5_values() -> None:
"""AccessPattern is a Literal with 5 values: whole_struct, field_by_field, hot_cold_split, bulk_batched, mixed."""
expected = {"whole_struct", "field_by_field", "hot_cold_split", "bulk_batched", "mixed"}
assert set(AccessPattern.__args__) == expected
def test_frequency_7_values() -> None:
"""Frequency is a Literal with 7 values: hot, per_turn, per_discussion, per_request, cold, init, unknown."""
expected = {"hot", "per_turn", "per_discussion", "per_request", "cold", "init", "unknown"}
assert set(Frequency.__args__) == expected
def test_recommended_direction_4_values() -> None:
"""RecommendedDirection is a Literal with 4 values: componentize, unify, hold, insufficient_data."""
expected = {"componentize", "unify", "hold", "insufficient_data"}
assert set(RecommendedDirection.__args__) == expected
def test_function_ref_4_fields() -> None:
"""FunctionRef has fqname, file, line, role (per spec)."""
ref = FunctionRef(
fqname="src.ai_client.AIClient.send_result",
file="src/ai_client.py",
line=100,
role="producer",
)
assert ref.fqname == "src.ai_client.AIClient.send_result"
assert ref.file == "src/ai_client.py"
assert ref.line == 100
assert ref.role == "producer"
def test_function_ref_frozen() -> None:
"""FunctionRef is frozen (immutability per error_handling.md)."""
ref = FunctionRef(
fqname="src.x.y",
file="src/x.py",
line=1,
role="consumer",
)
with pytest.raises((AttributeError, Exception)) as exc_info:
ref.fqname = "src.z.w"
assert "frozen" in str(exc_info.value).lower() or "cannot assign" in str(exc_info.value).lower()
def test_access_pattern_evidence_4_fields() -> None:
"""AccessPatternEvidence has function, pattern, field_accesses, confidence."""
ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer")
ev = AccessPatternEvidence(
function=ref,
pattern="field_by_field",
field_accesses={"path": 3, "view_mode": 2},
confidence="high",
)
assert ev.function is ref
assert ev.pattern == "field_by_field"
assert ev.field_accesses == {"path": 3, "view_mode": 2}
assert ev.confidence == "high"
def test_frequency_evidence_4_fields() -> None:
"""FrequencyEvidence has function, frequency, source, note (default '')."""
ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="both")
ev = FrequencyEvidence(
function=ref,
frequency="per_turn",
source="entry_point",
note="called per LLM turn",
)
assert ev.function is ref
assert ev.frequency == "per_turn"
assert ev.source == "entry_point"
assert ev.note == "called per LLM turn"
def test_frequency_evidence_default_note() -> None:
"""FrequencyEvidence.note defaults to ''."""
ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer")
ev = FrequencyEvidence(function=ref, frequency="cold", source="control_flow_position")
assert ev.note == ""
def test_result_coverage_5_fields() -> None:
"""ResultCoverage has total_producers, result_producers, total_consumers, result_consumers, summary."""
cov = ResultCoverage(
total_producers=12,
result_producers=5,
total_consumers=15,
result_consumers=8,
summary="5/12 producers return Result[T] (42%); 8/15 consumers branch on .errors (53%)",
)
assert cov.total_producers == 12
assert cov.result_producers == 5
assert cov.total_consumers == 15
assert cov.result_consumers == 8
assert "42%" in cov.summary
assert "53%" in cov.summary
def test_type_alias_coverage_4_fields() -> None:
"""TypeAliasCoverage has total_sites, typed_sites, untyped_sites, summary."""
cov = TypeAliasCoverage(
total_sites=45,
typed_sites=38,
untyped_sites=7,
summary="45 total sites; 38 typed (84%); 7 untyped (16%)",
)
assert cov.total_sites == 45
assert cov.typed_sites == 38
assert cov.untyped_sites == 7
assert "84%" in cov.summary
assert "16%" in cov.summary
def test_cross_audit_finding_5_fields() -> None:
"""CrossAuditFinding has audit_script, site_count, example_file, example_line, note (default '')."""
finding = CrossAuditFinding(
audit_script="audit_weak_types",
site_count=12,
example_file="src/ai_client.py",
example_line=100,
note="12 weak-type sites in producer+consumer functions",
)
assert finding.audit_script == "audit_weak_types"
assert finding.site_count == 12
assert finding.example_file == "src/ai_client.py"
assert finding.example_line == 100
assert finding.note == "12 weak-type sites in producer+consumer functions"
def test_cross_audit_finding_default_note() -> None:
"""CrossAuditFinding.note defaults to ''."""
finding = CrossAuditFinding(
audit_script="audit_optional_in_3_files",
site_count=0,
example_file="",
example_line=0,
)
assert finding.note == ""
def test_cross_audit_findings_5_audit_scripts() -> None:
"""CrossAuditFindings has 5 audit-script fields, each a tuple of CrossAuditFinding."""
findings = CrossAuditFindings(
weak_types=(),
exception_handling=(),
optional_in_baseline=(),
config_io_ownership=(),
import_graph=(),
)
assert findings.weak_types == ()
assert findings.exception_handling == ()
assert findings.optional_in_baseline == ()
assert findings.config_io_ownership == ()
assert findings.import_graph == ()
def test_decomposition_cost_8_fields() -> None:
"""DecompositionCost has 8 fields per spec."""
cost = DecompositionCost(
current_cost_estimate=1500,
componentize_savings=450,
unify_savings=0,
recommended_direction="hold",
recommended_rationale="whole_struct access on a frozen dataclass; current shape is correct",
batch_size=None,
struct_field_count=8,
struct_frozen=True,
)
assert cost.current_cost_estimate == 1500
assert cost.componentize_savings == 450
assert cost.unify_savings == 0
assert cost.recommended_direction == "hold"
assert "frozen" in cost.recommended_rationale
assert cost.batch_size is None
assert cost.struct_field_count == 8
assert cost.struct_frozen is True
def test_optimization_candidate_7_fields() -> None:
"""OptimizationCandidate has 7 fields per spec."""
cand = OptimizationCandidate(
candidate="Migrate 7 producers of Metadata to Result[Metadata]",
direction="componentize",
affected_files=("src/ai_client.py", "src/app_controller.py", "src/history.py"),
estimated_savings_us=500,
effort="small",
priority="high",
cross_ref="docs/reports/EXCEPTION_HANDLING_AUDIT_20260616.md",
)
assert "Migrate" in cand.candidate
assert cand.direction == "componentize"
assert len(cand.affected_files) == 3
assert cand.estimated_savings_us == 500
assert cand.effort == "small"
assert cand.priority == "high"
assert "EXCEPTION_HANDLING_AUDIT" in cand.cross_ref
def test_aggregate_profile_14_fields() -> None:
"""AggregateProfile has 14 top-level fields (per spec section 7.1)."""
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="producer")
profile = AggregateProfile(
name="Metadata",
aggregate_kind="typealias",
memory_dim="discussion",
producers=(f,),
consumers=(f,),
access_pattern="field_by_field",
access_pattern_evidence=(AccessPatternEvidence(
function=f, pattern="field_by_field", field_accesses={"role": 3}, confidence="high"
),),
frequency="per_turn",
frequency_evidence=(FrequencyEvidence(
function=f, frequency="per_turn", source="entry_point", note="per LLM turn"
),),
result_coverage=ResultCoverage(0, 0, 0, 0, ""),
type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""),
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
decomposition_cost=DecompositionCost(0, 0, 0, "hold", "no data", None, 0, False),
optimization_candidates=(),
is_candidate=False,
)
assert profile.name == "Metadata"
assert profile.aggregate_kind == "typealias"
assert profile.memory_dim == "discussion"
assert len(profile.producers) == 1
assert len(profile.consumers) == 1
assert profile.access_pattern == "field_by_field"
assert len(profile.access_pattern_evidence) == 1
assert profile.frequency == "per_turn"
assert len(profile.frequency_evidence) == 1
assert profile.is_candidate is False
def test_aggregate_profile_is_candidate_true() -> None:
"""AggregateProfile.is_candidate=True for the 3 candidate aggregates."""
profile = AggregateProfile(
name="ChatMessage",
aggregate_kind="candidate_dataclass",
memory_dim="discussion",
producers=(),
consumers=(),
access_pattern="mixed",
access_pattern_evidence=(),
frequency="unknown",
frequency_evidence=(),
result_coverage=ResultCoverage(0, 0, 0, 0, ""),
type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""),
cross_audit_findings=CrossAuditFindings((), (), (), (), ()),
decomposition_cost=DecompositionCost(0, 0, 0, "insufficient_data", "candidate", None, 0, False),
optimization_candidates=(),
is_candidate=True,
)
assert profile.is_candidate is True
assert profile.aggregate_kind == "candidate_dataclass"
assert profile.producers == ()
assert profile.consumers == ()
def test_pcg_init_empty() -> None:
"""ProducerConsumerGraph starts with empty edges and producers/consumers dicts."""
pcg = ProducerConsumerGraph()
assert pcg.edges == {}
assert pcg.producers == {}
assert pcg.consumers == {}
def test_pcg_add_producer_consumer() -> None:
"""add_producer + add_consumer add to the bipartite graph."""
pcg = ProducerConsumerGraph()
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="producer")
pcg.add_producer("Metadata", f)
pcg.add_consumer("Metadata", f)
assert "Metadata" in pcg.producers
assert "Metadata" in pcg.consumers
assert f in pcg.producers["Metadata"]
assert f in pcg.consumers["Metadata"]
def test_p1_pass_finds_producer_of_T() -> None:
"""P1 detects a function whose return annotation is a TypeAlias name (producer of T)."""
source = textwrap.dedent('''
def send_result() -> Metadata:
return {}
''')
tree = ast.parse(source)
producers = P1_pass(tree, file="synthetic.py")
assert ("send_result", "Metadata", "producer", "high", 2) in producers
def test_p1_pass_finds_producer_of_Result_T() -> None:
"""P1 detects a function whose return annotation is Result[T] (producer of T)."""
source = textwrap.dedent('''
def fetch() -> Result[FileItems]:
return Result(data=[])
''')
tree = ast.parse(source)
producers = P1_pass(tree, file="synthetic.py")
assert ("fetch", "FileItems", "producer", "high", 2) in producers
def test_p1_pass_skips_non_annotated_return() -> None:
"""P1 returns [] for functions without return annotations."""
source = textwrap.dedent('''
def unannotated():
return {}
''')
tree = ast.parse(source)
producers = P1_pass(tree, file="synthetic.py")
assert producers == []
def test_p2_pass_finds_consumer_of_T() -> None:
"""P2 detects a function whose parameter is a TypeAlias name (consumer of T)."""
source = textwrap.dedent('''
def process(entry: Metadata) -> None:
pass
''')
tree = ast.parse(source)
consumers = P2_pass(tree, file="synthetic.py")
assert ("process", "Metadata", "consumer", "high", 2) in consumers
def test_p2_pass_finds_consumer_of_list_T() -> None:
"""P2 detects a function whose parameter is list[T] (consumer of T)."""
source = textwrap.dedent('''
def aggregate(items: list[FileItems]) -> None:
pass
''')
tree = ast.parse(source)
consumers = P2_pass(tree, file="synthetic.py")
assert ("aggregate", "FileItems", "consumer", "high", 2) in consumers
def test_p2_pass_skips_untyped_parameter() -> None:
"""P2 returns [] for parameters without type annotations."""
source = textwrap.dedent('''
def process(entry) -> None:
pass
''')
tree = ast.parse(source)
consumers = P2_pass(tree, file="synthetic.py")
assert consumers == []
def test_p3_pass_finds_consumer_via_subscript() -> None:
"""P3 detects a function that reads entry['path']; without a type registry, returns the field name only."""
source = textwrap.dedent('''
def process(entry) -> None:
path = entry['path']
''')
tree = ast.parse(source)
accesses = P3_pass(tree, file="synthetic.py", type_registry={})
assert ("process", "path", "subscript", 1, 2) in accesses
def test_p3_pass_finds_consumer_via_attribute() -> None:
"""P3 detects a function that reads entry.attr; returns (function, attr, kind, count)."""
source = textwrap.dedent('''
def process(entry) -> None:
path = entry.path
''')
tree = ast.parse(source)
accesses = P3_pass(tree, file="synthetic.py", type_registry={})
assert ("process", "path", "attribute", 1, 2) in accesses
def test_p3_pass_counts_multiple_accesses() -> None:
"""P3 counts multiple accesses to the same key within a single function."""
source = textwrap.dedent('''
def process(entry) -> None:
a = entry['path']
b = entry['path']
c = entry['view_mode']
''')
tree = ast.parse(source)
accesses = P3_pass(tree, file="synthetic.py", type_registry={})
path_count = sum(c for fn, k, kind, c, line in accesses if fn == "process" and k == "path")
assert path_count == 2
def test_build_pcg_returns_result() -> None:
"""build_pcg returns Result[ProducerConsumerGraph] per error_handling.md."""
with tempfile.TemporaryDirectory() as tmp:
(Path(tmp) / "mod.py").write_text(textwrap.dedent('''
from src.type_aliases import Metadata
def produce() -> Metadata:
return {}
'''))
result = build_pcg(tmp)
assert isinstance(result, Result)
assert result.ok
def test_build_pcg_finds_producer_via_p1() -> None:
"""build_pcg correctly identifies a producer of Metadata via P1."""
with tempfile.TemporaryDirectory() as tmp:
(Path(tmp) / "mod.py").write_text(textwrap.dedent('''
from src.type_aliases import Metadata
def produce() -> Metadata:
return {}
'''))
pcg = build_pcg(tmp).data
assert "Metadata" in pcg.producers
def test_build_pcg_tolerates_syntax_errors() -> None:
"""build_pcg records syntax errors as ErrorInfo (boundary pattern); Result.ok is False."""
with tempfile.TemporaryDirectory() as tmp:
(Path(tmp) / "bad.py").write_text("def unclosed(:\n pass")
result = build_pcg(tmp)
assert not result.ok
assert len(result.errors) >= 1
assert isinstance(result.errors[0], ErrorInfo)
assert result.data is not None
def test_canonical_memory_dim_has_aggregates() -> None:
"""CANONICAL_MEMORY_DIM has >=13 known aggregate -> dim mappings (10 in-scope + 3 candidate)."""
assert len(CANONICAL_MEMORY_DIM) >= 13
assert CANONICAL_MEMORY_DIM["Metadata"] == "discussion"
assert CANONICAL_MEMORY_DIM["CommsLogEntry"] == "discussion"
assert CANONICAL_MEMORY_DIM["FileItem"] == "curation"
assert CANONICAL_MEMORY_DIM["FileItems"] == "curation"
assert CANONICAL_MEMORY_DIM["Result"] == "control"
assert CANONICAL_MEMORY_DIM["ErrorInfo"] == "control"
def test_memory_dim_file_heuristic_has_5_buckets() -> None:
"""MEMORY_DIM_FILE_HEURISTIC has the 5 file-of-origin buckets."""
assert len(MEMORY_DIM_FILE_HEURISTIC) == 5
assert "curation" in MEMORY_DIM_FILE_HEURISTIC
assert "discussion" in MEMORY_DIM_FILE_HEURISTIC
assert "rag" in MEMORY_DIM_FILE_HEURISTIC
assert "config" in MEMORY_DIM_FILE_HEURISTIC
def test_load_memory_dim_overrides_empty() -> None:
"""load_memory_dim_overrides returns {} for a missing file."""
result = load_memory_dim_overrides("/nonexistent/overrides.toml")
assert result == {}
def test_load_memory_dim_overrides_parses_toml() -> None:
"""load_memory_dim_overrides parses [memory_dim.<aggregate>] = '<dim>' lines."""
with tempfile.TemporaryDirectory() as tmp:
overrides_path = Path(tmp) / "overrides.toml"
overrides_path.write_text('[memory_dim]\nMetadata = "curation"\n')
result = load_memory_dim_overrides(str(overrides_path))
assert result.get("Metadata") == "curation"
def test_file_origin_memory_dim_curation() -> None:
"""file_origin_memory_dim returns 'curation' for files in the curation bucket."""
dim = file_origin_memory_dim("src/aggregate.py")
assert dim == "curation"
def test_file_origin_memory_dim_discussion() -> None:
"""file_origin_memory_dim returns 'discussion' for files in the discussion bucket."""
dim = file_origin_memory_dim("src/ai_client.py")
assert dim == "discussion"
def test_file_origin_memory_dim_unknown() -> None:
"""file_origin_memory_dim returns 'unknown' for files not in any bucket."""
dim = file_origin_memory_dim("src/random.py")
assert dim == "unknown"
def test_classify_memory_dim_canonical() -> None:
"""classify_memory_dim returns the canonical dim for known aggregates, regardless of producer file."""
dim = classify_memory_dim("Metadata", "src/aggregate.py", overrides={})
assert dim == "discussion"
def test_classify_memory_dim_override() -> None:
"""classify_memory_dim respects the override file's mapping."""
dim = classify_memory_dim("Metadata", "src/aggregate.py", overrides={"Metadata": "curation"})
assert dim == "curation"
def test_classify_memory_dim_file_heuristic() -> None:
"""classify_memory_dim falls back to file-of-origin for unknown aggregates."""
dim = classify_memory_dim("SomeUnknownAggregate", "src/aggregate.py", overrides={})
assert dim == "curation"
def test_classify_memory_dim_unknown_when_no_evidence() -> None:
"""classify_memory_dim returns 'unknown' when no canonical, override, or file evidence."""
dim = classify_memory_dim("SomeUnknownAggregate", "src/random.py", overrides={})
assert dim == "unknown"
def test_threshold_constants() -> None:
"""The 4 APD threshold constants are defined."""
assert WHOLE_STRUCT_KEY_THRESHOLD == 1
assert FIELD_BY_FIELD_KEY_THRESHOLD == 3
assert MIXED_DOMINANCE_THRESHOLD == 0.6
assert AGGREGATE_LEVEL_DOMINANCE_THRESHOLD == 0.25
def test_is_whole_struct_access_true() -> None:
"""is_whole_struct_access returns True for a function that reads the aggregate without accessing fields."""
counts: Counter[str] = Counter()
assert is_whole_struct_access(counts, has_direct_access=True) is True
def test_is_whole_struct_access_one_key() -> None:
"""is_whole_struct_access returns True for <=1 distinct key."""
counts: Counter[str] = Counter({"path": 3})
assert is_whole_struct_access(counts, has_direct_access=False) is True
def test_is_whole_struct_access_two_keys_false() -> None:
"""is_whole_struct_access returns False for 2+ distinct keys."""
counts: Counter[str] = Counter({"path": 3, "view_mode": 2})
assert is_whole_struct_access(counts, has_direct_access=False) is False
def test_is_field_by_field_access_true() -> None:
"""is_field_by_field_access returns True for >=3 distinct keys AND no whole_struct access."""
counts: Counter[str] = Counter({"a": 1, "b": 1, "c": 1})
assert is_field_by_field_access(counts) is True
def test_is_field_by_field_access_few_keys() -> None:
"""is_field_by_field_access returns False for <3 distinct keys."""
counts: Counter[str] = Counter({"a": 1, "b": 1})
assert is_field_by_field_access(counts) is False
def test_is_hot_cold_split_true() -> None:
"""is_hot_cold_split returns True for 1-2 hot keys + 2+ cold keys."""
hot = {"role", "content"}
cold = {"tool_calls", "reasoning_content"}
assert is_hot_cold_split(hot, cold) is True
def test_is_hot_cold_split_too_many_hot() -> None:
"""is_hot_cold_split returns False for 3+ hot keys."""
hot = {"a", "b", "c"}
cold = {"d", "e"}
assert is_hot_cold_split(hot, cold) is False
def test_is_hot_cold_split_too_few_cold() -> None:
"""is_hot_cold_split returns False for <2 cold keys."""
hot = {"a"}
cold = {"b"}
assert is_hot_cold_split(hot, cold) is False
def test_is_bulk_batched_access_true() -> None:
"""is_bulk_batched_access returns True for a function iterating over a list of aggregates."""
assert is_bulk_batched_access(iterates_over_list=True, body_accesses_uniform=True) is True
def test_is_bulk_batched_access_no_iteration() -> None:
"""is_bulk_batched_access returns False when the function doesn't iterate over a list."""
assert is_bulk_batched_access(iterates_over_list=False, body_accesses_uniform=True) is False
def test_is_bulk_batched_access_non_uniform() -> None:
"""is_bulk_batched_access returns False when the body has non-uniform access."""
assert is_bulk_batched_access(iterates_over_list=True, body_accesses_uniform=False) is False
def test_dominant_pattern_clear_winner() -> None:
"""dominant_pattern returns the pattern with the highest share if >=25%."""
counts = {"field_by_field": 3, "whole_struct": 1}
assert dominant_pattern(counts) == "field_by_field"
def test_dominant_pattern_below_threshold() -> None:
"""dominant_pattern returns 'mixed' when no pattern has >=25% share."""
counts = {"field_by_field": 1, "whole_struct": 1, "hot_cold_split": 1, "bulk_batched": 1}
assert dominant_pattern(counts) == "mixed"
def test_dominant_pattern_empty() -> None:
"""dominant_pattern returns 'mixed' for an empty counts dict."""
assert dominant_pattern({}) == "mixed"
def test_detect_access_pattern_whole_struct() -> None:
"""detect_access_pattern returns 'whole_struct' for a function that reads 0-1 keys."""
counts: Counter[str] = Counter()
pattern = detect_access_pattern(counts, has_direct_access=True)
assert pattern == "whole_struct"
def test_detect_access_pattern_field_by_field() -> None:
"""detect_access_pattern returns 'field_by_field' for a function that reads 3+ keys."""
counts: Counter[str] = Counter({"a": 1, "b": 1, "c": 1})
pattern = detect_access_pattern(counts, has_direct_access=False)
assert pattern == "field_by_field"
def test_detect_access_pattern_hot_cold_split() -> None:
"""detect_access_pattern returns 'hot_cold_split' for 1-2 hot + 2+ cold keys."""
counts: Counter[str] = Counter({"a": 1, "b": 1, "c": 1, "d": 1})
pattern = detect_access_pattern(counts, has_direct_access=False, hot_keys={"a", "b"}, cold_keys={"c", "d"})
assert pattern == "hot_cold_split"
def test_detect_access_pattern_mixed() -> None:
"""detect_access_pattern returns 'mixed' when no pattern dominates (2+ distinct keys but <3)."""
counts: Counter[str] = Counter({"a": 1, "b": 1})
pattern = detect_access_pattern(counts, has_direct_access=False)
assert pattern == "mixed"
def test_detect_frequency_init() -> None:
"""detect_frequency_from_entry_point returns 'init' for functions called from __init__."""
freq = detect_frequency_from_entry_point(caller="__init__", caller_class="App")
assert freq == "init"
def test_detect_frequency_hot() -> None:
"""detect_frequency_from_entry_point returns 'hot' for functions called from render loops."""
freq = detect_frequency_from_entry_point(caller="render_main_toolbar", caller_class="App")
assert freq == "hot"
def test_detect_frequency_per_turn() -> None:
"""detect_frequency_from_entry_point returns 'per_turn' for functions called from AI send paths."""
freq = detect_frequency_from_entry_point(caller="_send_anthropic_result", caller_class="AIClient")
assert freq == "per_turn"
def test_detect_frequency_cold() -> None:
"""detect_frequency_from_entry_point returns 'cold' for functions called from cleanup."""
freq = detect_frequency_from_entry_point(caller="cleanup", caller_class="AppController")
assert freq == "cold"
def test_detect_frequency_per_discussion() -> None:
"""detect_frequency_from_entry_point returns 'per_discussion' for save/load functions."""
freq = detect_frequency_from_entry_point(caller="save_project", caller_class="ProjectManager")
assert freq == "per_discussion"
def test_detect_frequency_unknown() -> None:
"""detect_frequency_from_entry_point returns 'unknown' for unrecognized callers."""
freq = detect_frequency_from_entry_point(caller="random_method", caller_class="X")
assert freq == "unknown"
def test_load_frequency_overrides_empty() -> None:
"""load_frequency_overrides returns {} for a missing file."""
result = load_frequency_overrides("/nonexistent/overrides.toml")
assert result == {}
def test_load_frequency_overrides_parses_toml() -> None:
"""load_frequency_overrides parses [frequency.<function_fqname>] = '<freq>' lines."""
with tempfile.TemporaryDirectory() as tmp:
overrides_path = Path(tmp) / "overrides.toml"
overrides_path.write_text('[frequency]\n"src.cleanup.do_nothing" = "cold"\n')
result = load_frequency_overrides(str(overrides_path))
assert result.get("src.cleanup.do_nothing") == "cold"
def test_estimate_call_frequency_override_wins() -> None:
"""estimate_call_frequency respects the override file's mapping."""
f = FunctionRef(fqname="src.cleanup.do_nothing", file="src/cleanup.py", line=1, role="consumer")
freq = estimate_call_frequency(
f,
callers=[],
overrides={"src.cleanup.do_nothing": "cold"},
)
assert freq == "cold"
def test_estimate_call_frequency_entry_point() -> None:
"""estimate_call_frequency uses the entry-point detector when no override."""
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer")
freq = estimate_call_frequency(
f,
callers=[(FunctionRef(fqname="src.app.App.__init__", file="src/app.py", line=1, role="producer"), "App")],
overrides={},
)
assert freq == "init"
def test_estimate_call_frequency_unknown_no_callers() -> None:
"""estimate_call_frequency returns 'unknown' for functions with no callers and no override."""
f = FunctionRef(fqname="src.lonely.func", file="src/lonely.py", line=1, role="consumer")
freq = estimate_call_frequency(f, callers=[], overrides={})
assert freq == "unknown"
def test_cost_constants() -> None:
"""The 6 cost-model constants are defined per spec section 7.5."""
assert MICROSECOND_BUDGET_PER_LLM_TURN == 50_000
assert BRANCH_DISPATCH_OVERHEAD_US == 100
assert ALLOCATION_OVERHEAD_US == 50
assert DEAD_FIELD_COST_PER_FIELD_US == 10
assert COMPONENTIZATION_INDIRECTION_US == 200
assert UNIFICATION_INDIRECTION_US == 300
def test_per_call_cost_us_no_frozen() -> None:
"""per_call_cost_us = struct_field_count * 50 + max(fields_accessed_in_hot_path, 1) * 100 + 0 (not frozen)."""
cost = per_call_cost_us(struct_field_count=10, hot_path_field_count=2, struct_frozen=False)
assert cost == 10 * 50 + 2 * 100
def test_per_call_cost_us_frozen() -> None:
"""per_call_cost_us adds 20 for frozen dataclasses."""
cost = per_call_cost_us(struct_field_count=10, hot_path_field_count=2, struct_frozen=True)
assert cost == 10 * 50 + 2 * 100 + 20
def test_per_call_cost_us_min_hot_path() -> None:
"""per_call_cost_us uses max(hot_path_field_count, 1) to avoid zero branch overhead."""
cost = per_call_cost_us(struct_field_count=10, hot_path_field_count=0, struct_frozen=False)
assert cost == 10 * 50 + 1 * 100
def test_frequency_multiplier_7_values() -> None:
"""FREQUENCY_MULTIPLIER has 7 entries."""
assert FREQUENCY_MULTIPLIER["hot"] == 60
assert FREQUENCY_MULTIPLIER["per_turn"] == 1
assert FREQUENCY_MULTIPLIER["per_request"] == 1
assert FREQUENCY_MULTIPLIER["per_discussion"] == 1
assert FREQUENCY_MULTIPLIER["cold"] == 0.01
assert FREQUENCY_MULTIPLIER["init"] == 0.001
assert FREQUENCY_MULTIPLIER["unknown"] == 0
def test_current_total_us_per_turn() -> None:
"""current_total_us = per_call_cost * frequency_multiplier for per_turn."""
total = current_total_us(per_call_cost=500, frequency="per_turn")
assert total == 500
def test_current_total_us_hot() -> None:
"""current_total_us = per_call_cost * 60 for hot."""
total = current_total_us(per_call_cost=500, frequency="hot")
assert total == 30_000
def test_componentize_factor_field_by_field_large() -> None:
"""componentize_factor=0.30 for field_by_field + struct_field_count > 10 + not frozen."""
f = componentize_factor(access_pattern="field_by_field", struct_field_count=15, struct_frozen=False)
assert f == 0.30
def test_componentize_factor_hot_cold_split_small_hot() -> None:
"""componentize_factor=0.40 for hot_cold_split + hot_field_count<=2 + struct_field_count>5."""
f = componentize_factor(access_pattern="hot_cold_split", struct_field_count=8, struct_frozen=False, hot_field_count=2)
assert f == 0.40
def test_componentize_factor_whole_struct_negative() -> None:
"""componentize_factor=-0.20 for whole_struct (splitting hurts)."""
f = componentize_factor(access_pattern="whole_struct", struct_field_count=5, struct_frozen=False)
assert f == -0.20
def test_componentize_factor_mixed_zero() -> None:
"""componentize_factor=0.0 for mixed (insufficient evidence)."""
f = componentize_factor(access_pattern="mixed", struct_field_count=5, struct_frozen=False)
assert f == 0.0
def test_unify_factor_bulk_batched_small_frozen() -> None:
"""unify_factor=0.25 for bulk_batched + struct_field_count <= 3 + frozen."""
f = unify_factor(access_pattern="bulk_batched", struct_field_count=3, struct_frozen=True)
assert f == 0.25
def test_unify_factor_whole_struct_small_frozen() -> None:
"""unify_factor=0.15 for whole_struct + struct_field_count <= 5 + frozen."""
f = unify_factor(access_pattern="whole_struct", struct_field_count=5, struct_frozen=True)
assert f == 0.15
def test_unify_factor_field_by_field_negative() -> None:
"""unify_factor=-0.30 for field_by_field (unification widens the data)."""
f = unify_factor(access_pattern="field_by_field", struct_field_count=15, struct_frozen=True)
assert f == -0.30
def test_unify_factor_mixed_zero() -> None:
"""unify_factor=0.0 for mixed (insufficient evidence)."""
f = unify_factor(access_pattern="mixed", struct_field_count=5, struct_frozen=True)
assert f == 0.0
def test_recommended_direction_componentize_field_by_field() -> None:
"""recommended_direction='componentize' for field_by_field + struct_field_count>10."""
d = recommended_direction(access_pattern="field_by_field", struct_field_count=15, struct_frozen=False, frequency="per_turn", hot_field_count=0)
assert d == "componentize"
def test_recommended_direction_unify_bulk_batched() -> None:
"""recommended_direction='unify' for bulk_batched + struct_field_count<=3."""
d = recommended_direction(access_pattern="bulk_batched", struct_field_count=3, struct_frozen=True, frequency="per_turn", hot_field_count=0)
assert d == "unify"
def test_recommended_direction_insufficient_data_mixed() -> None:
"""recommended_direction='insufficient_data' for mixed (needs runtime profiling)."""
d = recommended_direction(access_pattern="mixed", struct_field_count=5, struct_frozen=True, frequency="per_turn", hot_field_count=0)
assert d == "insufficient_data"
def test_recommended_direction_hold_frozen_whole_struct() -> None:
"""recommended_direction='hold' for frozen + whole_struct (ideal shape)."""
d = recommended_direction(access_pattern="whole_struct", struct_field_count=5, struct_frozen=True, frequency="per_turn", hot_field_count=0)
assert d == "hold"
def test_generate_rationale_includes_pattern() -> None:
"""generate_rationale includes the access pattern."""
s = generate_rationale(
aggregate="Metadata",
access_pattern="field_by_field",
frequency="per_turn",
struct_field_count=15,
struct_frozen=False,
direction="componentize",
)
assert "field_by_field" in s
assert "per_turn" in s
assert "componentize" in s
assert "Metadata" in s
def test_compute_decomposition_cost_hold() -> None:
"""compute_decomposition_cost returns 'hold' for the canonical frozen + whole_struct case."""
cost = compute_decomposition_cost(
aggregate="Metadata",
access_pattern="whole_struct",
struct_field_count=8,
struct_frozen=True,
frequency="per_turn",
)
assert cost.recommended_direction == "hold"
assert cost.struct_field_count == 8
assert cost.struct_frozen is True
def test_compute_decomposition_cost_componentize() -> None:
"""compute_decomposition_cost returns 'componentize' for field_by_field + large struct."""
cost = compute_decomposition_cost(
aggregate="BigStruct",
access_pattern="field_by_field",
struct_field_count=15,
struct_frozen=False,
frequency="per_turn",
)
assert cost.recommended_direction == "componentize"
assert cost.componentize_savings > 0