diff --git a/src/code_path_audit.py b/src/code_path_audit.py index a1682dca..fc68bd0b 100644 --- a/src/code_path_audit.py +++ b/src/code_path_audit.py @@ -9,7 +9,7 @@ postfix DSL + markdown + prefix tree text. See conductor/tracks/code_path_audit_20260607/spec_v2.md. """ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Literal AggregateKind = Literal[ @@ -59,4 +59,90 @@ class FunctionRef: fqname: str file: str line: int - role: str \ No newline at end of file + role: str + +@dataclass(frozen=True) +class AccessPatternEvidence: + function: FunctionRef + pattern: AccessPattern + field_accesses: dict[str, int] + confidence: str + +@dataclass(frozen=True) +class FrequencyEvidence: + function: FunctionRef + frequency: Frequency + source: str + note: str = "" + +@dataclass(frozen=True) +class ResultCoverage: + total_producers: int + result_producers: int + total_consumers: int + result_consumers: int + summary: str + +@dataclass(frozen=True) +class TypeAliasCoverage: + total_sites: int + typed_sites: int + untyped_sites: int + summary: str + +@dataclass(frozen=True) +class CrossAuditFinding: + audit_script: str + site_count: int + example_file: str + example_line: int + note: str = "" + +@dataclass(frozen=True) +class CrossAuditFindings: + weak_types: tuple[CrossAuditFinding, ...] + exception_handling: tuple[CrossAuditFinding, ...] + optional_in_baseline: tuple[CrossAuditFinding, ...] + config_io_ownership: tuple[CrossAuditFinding, ...] + import_graph: tuple[CrossAuditFinding, ...] + +@dataclass(frozen=True) +class DecompositionCost: + current_cost_estimate: int + componentize_savings: int + unify_savings: int + recommended_direction: RecommendedDirection + recommended_rationale: str + batch_size: int | None + struct_field_count: int + struct_frozen: bool + +@dataclass(frozen=True) +class OptimizationCandidate: + candidate: str + direction: RecommendedDirection + affected_files: tuple[str, ...] + estimated_savings_us: int + effort: str + priority: str + cross_ref: str = "" + +@dataclass(frozen=True) +class AggregateProfile: + name: str + aggregate_kind: AggregateKind + memory_dim: MemoryDim + producers: tuple[FunctionRef, ...] + consumers: tuple[FunctionRef, ...] + access_pattern: AccessPattern + access_pattern_evidence: tuple[AccessPatternEvidence, ...] + frequency: Frequency + frequency_evidence: tuple[FrequencyEvidence, ...] + result_coverage: ResultCoverage + type_alias_coverage: TypeAliasCoverage + cross_audit_findings: CrossAuditFindings + decomposition_cost: DecompositionCost + optimization_candidates: tuple[OptimizationCandidate, ...] + is_candidate: bool + mermaid: str = "" + markdown: str = "" \ No newline at end of file diff --git a/tests/test_code_path_audit.py b/tests/test_code_path_audit.py index 96ba1c8c..caaf9499 100644 --- a/tests/test_code_path_audit.py +++ b/tests/test_code_path_audit.py @@ -8,6 +8,15 @@ from src.code_path_audit import ( Frequency, RecommendedDirection, FunctionRef, + AccessPatternEvidence, + FrequencyEvidence, + ResultCoverage, + TypeAliasCoverage, + CrossAuditFinding, + CrossAuditFindings, + DecompositionCost, + OptimizationCandidate, + AggregateProfile, ) def test_aggregate_kind_4_values() -> None: @@ -58,4 +67,207 @@ def test_function_ref_frozen() -> None: ) with pytest.raises((AttributeError, Exception)) as exc_info: ref.fqname = "src.z.w" - assert "frozen" in str(exc_info.value).lower() or "cannot assign" in str(exc_info.value).lower() \ No newline at end of file + assert "frozen" in str(exc_info.value).lower() or "cannot assign" in str(exc_info.value).lower() + +def test_access_pattern_evidence_4_fields() -> None: + """AccessPatternEvidence has function, pattern, field_accesses, confidence.""" + ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer") + ev = AccessPatternEvidence( + function=ref, + pattern="field_by_field", + field_accesses={"path": 3, "view_mode": 2}, + confidence="high", + ) + assert ev.function is ref + assert ev.pattern == "field_by_field" + assert ev.field_accesses == {"path": 3, "view_mode": 2} + assert ev.confidence == "high" + +def test_frequency_evidence_4_fields() -> None: + """FrequencyEvidence has function, frequency, source, note (default '').""" + ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="both") + ev = FrequencyEvidence( + function=ref, + frequency="per_turn", + source="entry_point", + note="called per LLM turn", + ) + assert ev.function is ref + assert ev.frequency == "per_turn" + assert ev.source == "entry_point" + assert ev.note == "called per LLM turn" + +def test_frequency_evidence_default_note() -> None: + """FrequencyEvidence.note defaults to ''.""" + ref = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer") + ev = FrequencyEvidence(function=ref, frequency="cold", source="control_flow_position") + assert ev.note == "" + +def test_result_coverage_5_fields() -> None: + """ResultCoverage has total_producers, result_producers, total_consumers, result_consumers, summary.""" + cov = ResultCoverage( + total_producers=12, + result_producers=5, + total_consumers=15, + result_consumers=8, + summary="5/12 producers return Result[T] (42%); 8/15 consumers branch on .errors (53%)", + ) + assert cov.total_producers == 12 + assert cov.result_producers == 5 + assert cov.total_consumers == 15 + assert cov.result_consumers == 8 + assert "42%" in cov.summary + assert "53%" in cov.summary + +def test_type_alias_coverage_4_fields() -> None: + """TypeAliasCoverage has total_sites, typed_sites, untyped_sites, summary.""" + cov = TypeAliasCoverage( + total_sites=45, + typed_sites=38, + untyped_sites=7, + summary="45 total sites; 38 typed (84%); 7 untyped (16%)", + ) + assert cov.total_sites == 45 + assert cov.typed_sites == 38 + assert cov.untyped_sites == 7 + assert "84%" in cov.summary + assert "16%" in cov.summary + +def test_cross_audit_finding_5_fields() -> None: + """CrossAuditFinding has audit_script, site_count, example_file, example_line, note (default '').""" + finding = CrossAuditFinding( + audit_script="audit_weak_types", + site_count=12, + example_file="src/ai_client.py", + example_line=100, + note="12 weak-type sites in producer+consumer functions", + ) + assert finding.audit_script == "audit_weak_types" + assert finding.site_count == 12 + assert finding.example_file == "src/ai_client.py" + assert finding.example_line == 100 + assert finding.note == "12 weak-type sites in producer+consumer functions" + +def test_cross_audit_finding_default_note() -> None: + """CrossAuditFinding.note defaults to ''.""" + finding = CrossAuditFinding( + audit_script="audit_optional_in_3_files", + site_count=0, + example_file="", + example_line=0, + ) + assert finding.note == "" + +def test_cross_audit_findings_5_audit_scripts() -> None: + """CrossAuditFindings has 5 audit-script fields, each a tuple of CrossAuditFinding.""" + findings = CrossAuditFindings( + weak_types=(), + exception_handling=(), + optional_in_baseline=(), + config_io_ownership=(), + import_graph=(), + ) + assert findings.weak_types == () + assert findings.exception_handling == () + assert findings.optional_in_baseline == () + assert findings.config_io_ownership == () + assert findings.import_graph == () + +def test_decomposition_cost_8_fields() -> None: + """DecompositionCost has 8 fields per spec.""" + cost = DecompositionCost( + current_cost_estimate=1500, + componentize_savings=450, + unify_savings=0, + recommended_direction="hold", + recommended_rationale="whole_struct access on a frozen dataclass; current shape is correct", + batch_size=None, + struct_field_count=8, + struct_frozen=True, + ) + assert cost.current_cost_estimate == 1500 + assert cost.componentize_savings == 450 + assert cost.unify_savings == 0 + assert cost.recommended_direction == "hold" + assert "frozen" in cost.recommended_rationale + assert cost.batch_size is None + assert cost.struct_field_count == 8 + assert cost.struct_frozen is True + +def test_optimization_candidate_7_fields() -> None: + """OptimizationCandidate has 7 fields per spec.""" + cand = OptimizationCandidate( + candidate="Migrate 7 producers of Metadata to Result[Metadata]", + direction="componentize", + affected_files=("src/ai_client.py", "src/app_controller.py", "src/history.py"), + estimated_savings_us=500, + effort="small", + priority="high", + cross_ref="docs/reports/EXCEPTION_HANDLING_AUDIT_20260616.md", + ) + assert "Migrate" in cand.candidate + assert cand.direction == "componentize" + assert len(cand.affected_files) == 3 + assert cand.estimated_savings_us == 500 + assert cand.effort == "small" + assert cand.priority == "high" + assert "EXCEPTION_HANDLING_AUDIT" in cand.cross_ref + +def test_aggregate_profile_14_fields() -> None: + """AggregateProfile has 14 top-level fields (per spec section 7.1).""" + f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="producer") + profile = AggregateProfile( + name="Metadata", + aggregate_kind="typealias", + memory_dim="discussion", + producers=(f,), + consumers=(f,), + access_pattern="field_by_field", + access_pattern_evidence=(AccessPatternEvidence( + function=f, pattern="field_by_field", field_accesses={"role": 3}, confidence="high" + ),), + frequency="per_turn", + frequency_evidence=(FrequencyEvidence( + function=f, frequency="per_turn", source="entry_point", note="per LLM turn" + ),), + result_coverage=ResultCoverage(0, 0, 0, 0, ""), + type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""), + cross_audit_findings=CrossAuditFindings((), (), (), (), ()), + decomposition_cost=DecompositionCost(0, 0, 0, "hold", "no data", None, 0, False), + optimization_candidates=(), + is_candidate=False, + ) + assert profile.name == "Metadata" + assert profile.aggregate_kind == "typealias" + assert profile.memory_dim == "discussion" + assert len(profile.producers) == 1 + assert len(profile.consumers) == 1 + assert profile.access_pattern == "field_by_field" + assert len(profile.access_pattern_evidence) == 1 + assert profile.frequency == "per_turn" + assert len(profile.frequency_evidence) == 1 + assert profile.is_candidate is False + +def test_aggregate_profile_is_candidate_true() -> None: + """AggregateProfile.is_candidate=True for the 3 candidate aggregates.""" + profile = AggregateProfile( + name="ChatMessage", + aggregate_kind="candidate_dataclass", + memory_dim="discussion", + producers=(), + consumers=(), + access_pattern="mixed", + access_pattern_evidence=(), + frequency="unknown", + frequency_evidence=(), + result_coverage=ResultCoverage(0, 0, 0, 0, ""), + type_alias_coverage=TypeAliasCoverage(0, 0, 0, ""), + cross_audit_findings=CrossAuditFindings((), (), (), (), ()), + decomposition_cost=DecompositionCost(0, 0, 0, "insufficient_data", "candidate", None, 0, False), + optimization_candidates=(), + is_candidate=True, + ) + assert profile.is_candidate is True + assert profile.aggregate_kind == "candidate_dataclass" + assert profile.producers == () + assert profile.consumers == () \ No newline at end of file