"""Tests for src.code_path_audit v2 - cross-audit integration + DSL.""" from __future__ import annotations import ast import textwrap import tempfile import json from pathlib import Path from collections import Counter import pytest from src.code_path_audit import ( AggregateKind, MemoryDim, AccessPattern, Frequency, RecommendedDirection, FunctionRef, AccessPatternEvidence, FrequencyEvidence, ResultCoverage, TypeAliasCoverage, CrossAuditFinding, CrossAuditFindings, DecompositionCost, OptimizationCandidate, AggregateProfile, ProducerConsumerGraph, P1_pass, P2_pass, P3_pass, build_pcg, CANONICAL_MEMORY_DIM, MEMORY_DIM_FILE_HEURISTIC, load_memory_dim_overrides, file_origin_memory_dim, classify_memory_dim, WHOLE_STRUCT_KEY_THRESHOLD, FIELD_BY_FIELD_KEY_THRESHOLD, MIXED_DOMINANCE_THRESHOLD, AGGREGATE_LEVEL_DOMINANCE_THRESHOLD, is_whole_struct_access, is_field_by_field_access, is_hot_cold_split, is_bulk_batched_access, dominant_pattern, detect_access_pattern, detect_frequency_from_entry_point, load_frequency_overrides, estimate_call_frequency, MICROSECOND_BUDGET_PER_LLM_TURN, BRANCH_DISPATCH_OVERHEAD_US, ALLOCATION_OVERHEAD_US, DEAD_FIELD_COST_PER_FIELD_US, COMPONENTIZATION_INDIRECTION_US, UNIFICATION_INDIRECTION_US, per_call_cost_us, FREQUENCY_MULTIPLIER, current_total_us, componentize_factor, unify_factor, recommended_direction, generate_rationale, compute_decomposition_cost, read_input_json, INPUT_JSON_CONTRACTS, find_enclosing_function, compute_result_coverage, compute_type_alias_coverage, aggregate_cross_audit_findings, run_all_cross_audit_reads, DSL_WORD_ARITY_V2, ) from src.result_types import Result, ErrorInfo, ErrorKind # Phase 7 tests def test_read_input_json_success() -> None: """read_input_json returns Result[dict] on success.""" with tempfile.TemporaryDirectory() as tmp: p = Path(tmp) / "ok.json" p.write_text(json.dumps({"findings": [{"file": "x.py", "line": 1}]})) result = read_input_json(str(p)) assert result.ok assert result.data == {"findings": [{"file": "x.py", "line": 1}]} def test_read_input_json_missing_file() -> None: """read_input_json returns Result with ErrorInfo when the file is missing.""" result = read_input_json("/nonexistent/file.json") assert not result.ok assert len(result.errors) == 1 assert result.errors[0].kind == ErrorKind.NOT_FOUND def test_read_input_json_malformed_json() -> None: """read_input_json returns Result with ErrorInfo when the JSON is malformed.""" with tempfile.TemporaryDirectory() as tmp: p = Path(tmp) / "bad.json" p.write_text("{invalid json") result = read_input_json(str(p)) assert not result.ok assert result.errors[0].kind == ErrorKind.INVALID_INPUT def test_input_json_contracts_6_entries() -> None: """INPUT_JSON_CONTRACTS has 6 entries.""" assert len(INPUT_JSON_CONTRACTS) == 6 assert "audit_weak_types" in INPUT_JSON_CONTRACTS assert "audit_exception_handling" in INPUT_JSON_CONTRACTS assert "audit_optional_in_3_files" in INPUT_JSON_CONTRACTS assert "audit_no_models_config_io" in INPUT_JSON_CONTRACTS assert "audit_main_thread_imports" in INPUT_JSON_CONTRACTS assert "type_registry" in INPUT_JSON_CONTRACTS def test_find_enclosing_function_match() -> None: """find_enclosing_function returns the function ref whose (file, line) range contains the finding.""" f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer") refs = [ f, FunctionRef(fqname="src.x.z", file="src/x.py", line=100, role="consumer"), ] result = find_enclosing_function(file="src/x.py", line=15, function_refs=refs) assert result is f def test_find_enclosing_function_no_match() -> None: """find_enclosing_function returns None when no function contains the finding's (file, line).""" f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer") refs = [f] result = find_enclosing_function(file="src/y.py", line=15, function_refs=refs) assert result is None def test_compute_result_coverage_no_producers() -> None: """compute_result_coverage returns 0/0 when there are no producers.""" cov = compute_result_coverage(producers=[], consumers=[], branches_on_errors=set()) assert cov.total_producers == 0 assert cov.result_producers == 0 assert cov.total_consumers == 0 assert cov.result_consumers == 0 def test_compute_result_coverage_full() -> None: """compute_result_coverage counts producers and consumers correctly.""" f1 = FunctionRef(fqname="src.a", file="src/a.py", line=1, role="producer") f2 = FunctionRef(fqname="src.b", file="src/b.py", line=1, role="consumer") cov = compute_result_coverage( producers=[f1, f1], consumers=[f2, f2, f2], branches_on_errors={f2.fqname}, ) assert cov.total_producers == 2 assert cov.result_producers == 2 assert cov.total_consumers == 3 assert cov.result_consumers == 1 assert "100%" in cov.summary def test_compute_type_alias_coverage_no_sites() -> None: """compute_type_alias_coverage returns 0/0/0 when there are no sites.""" cov = compute_type_alias_coverage(total_sites=0, typed_sites=0) assert cov.total_sites == 0 assert cov.typed_sites == 0 assert cov.untyped_sites == 0 def test_compute_type_alias_coverage_partial() -> None: """compute_type_alias_coverage computes untyped_sites = total - typed.""" cov = compute_type_alias_coverage(total_sites=45, typed_sites=38) assert cov.total_sites == 45 assert cov.typed_sites == 38 assert cov.untyped_sites == 7 assert "84%" in cov.summary assert "16%" in cov.summary def test_aggregate_cross_audit_findings_empty() -> None: """aggregate_cross_audit_findings returns empty CrossAuditFindings for no findings.""" findings = aggregate_cross_audit_findings( audit_name="audit_weak_types", findings=[], example_file="", example_line=0, ) assert findings.weak_types == () assert findings.exception_handling == () def test_aggregate_cross_audit_findings_one_audit() -> None: """aggregate_cross_audit_findings puts 5 findings into the right bucket.""" findings_list = [ {"file": "src/a.py", "line": 1}, {"file": "src/b.py", "line": 2}, {"file": "src/c.py", "line": 3}, {"file": "src/d.py", "line": 4}, {"file": "src/e.py", "line": 5}, ] findings = aggregate_cross_audit_findings( audit_name="audit_weak_types", findings=findings_list, example_file="src/a.py", example_line=1, ) assert len(findings.weak_types) == 1 assert findings.weak_types[0].audit_script == "audit_weak_types" assert findings.weak_types[0].site_count == 5 def test_run_all_cross_audit_reads_missing_dir() -> None: """run_all_cross_audit_reads returns empty dicts when the dir is missing.""" result = run_all_cross_audit_reads("/nonexistent/audit_inputs") assert result == {} def test_run_all_cross_audit_reads_partial() -> None: """run_all_cross_audit_reads returns the inputs that exist; missing inputs are empty dicts.""" with tempfile.TemporaryDirectory() as tmp: (Path(tmp) / "audit_weak_types.json").write_text('{"findings": []}') (Path(tmp) / "audit_exception_handling.json").write_text('{"findings": []}') result = run_all_cross_audit_reads(tmp) assert "audit_weak_types" in result assert "audit_exception_handling" in result # Phase 8 Task 8.1 test def test_dsl_word_arity_v2_14_new_words() -> None: """DSL_WORD_ARITY_V2 has 14 new tagged words.""" expected_words = { "kind", "mem-dim", "fn-ref", "access-pattern", "ap-evidence", "frequency", "freq-evidence", "result-coverage", "type-alias-coverage", "cross-audit-finding", "cross-audit-findings", "decomp-cost", "opt-candidate", "is-candidate", } assert expected_words.issubset(set(DSL_WORD_ARITY_V2.keys())) assert DSL_WORD_ARITY_V2["kind"] == 1 assert DSL_WORD_ARITY_V2["fn-ref"] == 4 assert DSL_WORD_ARITY_V2["decomp-cost"] == 8