Private
Public Access
0
0
Files
manual_slop/tests/test_code_path_audit_phase78.py
T
ed b385cd441b refactor(audit): remove dead DSL parser (DSL files no longer produced)
The v2 postfix DSL parser (DSL_WORD_ARITY_V2, _atom, to_dsl_v2, parse_dsl_v2)
was implemented during the 14-phase DSL plan but never reached production:
run_audit() (line ~1217 after this change) only writes .md files (AUDIT_REPORT.md
plus per-aggregate markdowns via to_markdown/to_tree), never .dsl files. The DSL
parser carried latent arity bugs (DSL_WORD_ARITY_V2 declared 5 for 'result-coverage'
but writer emits 4; 4 for 'type-alias-coverage' but writer emits 3) which would
have caused silent parse failures.

Also removed the now-unused 'import re' statement (was only used by parse_dsl_v2).
The 'from datetime import date as date_mod' is retained (still used at line ~1259,
1275, 1291 in the markdown renderer).

Tests deleted in lockstep:
- tests/test_code_path_audit_phase78.py: test_dsl_word_arity_v2_14_new_words
- tests/test_code_path_audit_phase89.py: test_to_dsl_v2_includes_aggregate_kind_section,
  test_parse_dsl_v2_round_trip_aggregate_kind, test_parse_dsl_v2_malformed

Verification:
- grep -c 'to_dsl_v2|parse_dsl_v2|DSL_WORD_ARITY_V2' src/code_path_audit.py = 0
- 127 of 127 remaining tests pass (was 131; -4 tests deleted)
2026-06-24 09:57:17 -04:00

208 lines
7.0 KiB
Python

"""Tests for src.code_path_audit v2 - cross-audit integration + DSL."""
from __future__ import annotations
import ast
import textwrap
import tempfile
import json
from pathlib import Path
from collections import Counter
import pytest
from src.code_path_audit import (
AggregateKind,
MemoryDim,
AccessPattern,
Frequency,
RecommendedDirection,
FunctionRef,
AccessPatternEvidence,
FrequencyEvidence,
ResultCoverage,
TypeAliasCoverage,
CrossAuditFinding,
CrossAuditFindings,
DecompositionCost,
OptimizationCandidate,
AggregateProfile,
ProducerConsumerGraph,
P1_pass,
P2_pass,
P3_pass,
build_pcg,
CANONICAL_MEMORY_DIM,
MEMORY_DIM_FILE_HEURISTIC,
load_memory_dim_overrides,
file_origin_memory_dim,
classify_memory_dim,
WHOLE_STRUCT_KEY_THRESHOLD,
FIELD_BY_FIELD_KEY_THRESHOLD,
MIXED_DOMINANCE_THRESHOLD,
AGGREGATE_LEVEL_DOMINANCE_THRESHOLD,
is_whole_struct_access,
is_field_by_field_access,
is_hot_cold_split,
is_bulk_batched_access,
dominant_pattern,
detect_access_pattern,
detect_frequency_from_entry_point,
load_frequency_overrides,
estimate_call_frequency,
MICROSECOND_BUDGET_PER_LLM_TURN,
BRANCH_DISPATCH_OVERHEAD_US,
ALLOCATION_OVERHEAD_US,
DEAD_FIELD_COST_PER_FIELD_US,
COMPONENTIZATION_INDIRECTION_US,
UNIFICATION_INDIRECTION_US,
per_call_cost_us,
FREQUENCY_MULTIPLIER,
current_total_us,
componentize_factor,
unify_factor,
recommended_direction,
generate_rationale,
compute_decomposition_cost,
read_input_json,
INPUT_JSON_CONTRACTS,
find_enclosing_function,
compute_result_coverage,
compute_type_alias_coverage,
aggregate_cross_audit_findings,
run_all_cross_audit_reads,
)
from src.result_types import Result, ErrorInfo, ErrorKind
# Phase 7 tests
def test_read_input_json_success() -> None:
"""read_input_json returns Result[dict] on success."""
with tempfile.TemporaryDirectory() as tmp:
p = Path(tmp) / "ok.json"
p.write_text(json.dumps({"findings": [{"file": "x.py", "line": 1}]}))
result = read_input_json(str(p))
assert result.ok
assert result.data == {"findings": [{"file": "x.py", "line": 1}]}
def test_read_input_json_missing_file() -> None:
"""read_input_json returns Result with ErrorInfo when the file is missing."""
result = read_input_json("/nonexistent/file.json")
assert not result.ok
assert len(result.errors) == 1
assert result.errors[0].kind == ErrorKind.NOT_FOUND
def test_read_input_json_malformed_json() -> None:
"""read_input_json returns Result with ErrorInfo when the JSON is malformed."""
with tempfile.TemporaryDirectory() as tmp:
p = Path(tmp) / "bad.json"
p.write_text("{invalid json")
result = read_input_json(str(p))
assert not result.ok
assert result.errors[0].kind == ErrorKind.INVALID_INPUT
def test_input_json_contracts_6_entries() -> None:
"""INPUT_JSON_CONTRACTS has 6 entries."""
assert len(INPUT_JSON_CONTRACTS) == 6
assert "audit_weak_types" in INPUT_JSON_CONTRACTS
assert "audit_exception_handling" in INPUT_JSON_CONTRACTS
assert "audit_optional_in_3_files" in INPUT_JSON_CONTRACTS
assert "audit_no_models_config_io" in INPUT_JSON_CONTRACTS
assert "audit_main_thread_imports" in INPUT_JSON_CONTRACTS
assert "type_registry" in INPUT_JSON_CONTRACTS
def test_find_enclosing_function_match() -> None:
"""find_enclosing_function returns the function ref whose (file, line) range contains the finding."""
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer")
refs = [
f,
FunctionRef(fqname="src.x.z", file="src/x.py", line=100, role="consumer"),
]
result = find_enclosing_function(file="src/x.py", line=15, function_refs=refs)
assert result is f
def test_find_enclosing_function_no_match() -> None:
"""find_enclosing_function returns None when no function contains the finding's (file, line)."""
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer")
refs = [f]
result = find_enclosing_function(file="src/y.py", line=15, function_refs=refs)
assert result is None
def test_compute_result_coverage_no_producers() -> None:
"""compute_result_coverage returns 0/0 when there are no producers."""
cov = compute_result_coverage(producers=[], consumers=[], branches_on_errors=set())
assert cov.total_producers == 0
assert cov.result_producers == 0
assert cov.total_consumers == 0
assert cov.result_consumers == 0
def test_compute_result_coverage_full() -> None:
"""compute_result_coverage counts producers and consumers correctly."""
f1 = FunctionRef(fqname="src.a", file="src/a.py", line=1, role="producer")
f2 = FunctionRef(fqname="src.b", file="src/b.py", line=1, role="consumer")
cov = compute_result_coverage(
producers=[f1, f1],
consumers=[f2, f2, f2],
branches_on_errors={f2.fqname},
)
assert cov.total_producers == 2
assert cov.result_producers == 2
assert cov.total_consumers == 3
assert cov.result_consumers == 1
assert "100%" in cov.summary
def test_compute_type_alias_coverage_no_sites() -> None:
"""compute_type_alias_coverage returns 0/0/0 when there are no sites."""
cov = compute_type_alias_coverage(total_sites=0, typed_sites=0)
assert cov.total_sites == 0
assert cov.typed_sites == 0
assert cov.untyped_sites == 0
def test_compute_type_alias_coverage_partial() -> None:
"""compute_type_alias_coverage computes untyped_sites = total - typed."""
cov = compute_type_alias_coverage(total_sites=45, typed_sites=38)
assert cov.total_sites == 45
assert cov.typed_sites == 38
assert cov.untyped_sites == 7
assert "84%" in cov.summary
assert "16%" in cov.summary
def test_aggregate_cross_audit_findings_empty() -> None:
"""aggregate_cross_audit_findings returns empty CrossAuditFindings for no findings."""
findings = aggregate_cross_audit_findings(
audit_name="audit_weak_types",
findings=[],
example_file="",
example_line=0,
)
assert findings.weak_types == ()
assert findings.exception_handling == ()
def test_aggregate_cross_audit_findings_one_audit() -> None:
"""aggregate_cross_audit_findings puts 5 findings into the right bucket."""
findings_list = [
{"file": "src/a.py", "line": 1},
{"file": "src/b.py", "line": 2},
{"file": "src/c.py", "line": 3},
{"file": "src/d.py", "line": 4},
{"file": "src/e.py", "line": 5},
]
findings = aggregate_cross_audit_findings(
audit_name="audit_weak_types",
findings=findings_list,
example_file="src/a.py",
example_line=1,
)
assert len(findings.weak_types) == 1
assert findings.weak_types[0].audit_script == "audit_weak_types"
assert findings.weak_types[0].site_count == 5
def test_run_all_cross_audit_reads_missing_dir() -> None:
"""run_all_cross_audit_reads returns empty dicts when the dir is missing."""
result = run_all_cross_audit_reads("/nonexistent/audit_inputs")
assert result == {}
def test_run_all_cross_audit_reads_partial() -> None:
"""run_all_cross_audit_reads returns the inputs that exist; missing inputs are empty dicts."""
with tempfile.TemporaryDirectory() as tmp:
(Path(tmp) / "audit_weak_types.json").write_text('{"findings": []}')
(Path(tmp) / "audit_exception_handling.json").write_text('{"findings": []}')
result = run_all_cross_audit_reads(tmp)
assert "audit_weak_types" in result
assert "audit_exception_handling" in result