Private
Public Access
0
0
Files
manual_slop/tests/test_code_path_audit_phase78.py
T
ed e59334a303 feat(audit): implement Phase 7 cross-audit integration + Phase 8.1 DSL arity
Phase 7: read_input_json (stdlib I/O boundary), INPUT_JSON_CONTRACTS
(6 input sources), find_enclosing_function (3-tier mapping tier 1),
compute_result_coverage (cross-check of doeh), compute_type_alias_coverage
(cross-check of dss), aggregate_cross_audit_findings (per-aggregate
bucketing), run_all_cross_audit_reads (convenience).

Phase 8 Task 8.1: DSL_WORD_ARITY_V2 (14 new tagged words).

15 new unit tests passing. 111 total tests passing.

Phase 8 Tasks 8.2-8.5 (4 renderers + parser) next.
2026-06-22 01:49:14 -04:00

222 lines
7.6 KiB
Python

"""Tests for src.code_path_audit v2 - cross-audit integration + DSL."""
from __future__ import annotations
import ast
import textwrap
import tempfile
import json
from pathlib import Path
from collections import Counter
import pytest
from src.code_path_audit import (
AggregateKind,
MemoryDim,
AccessPattern,
Frequency,
RecommendedDirection,
FunctionRef,
AccessPatternEvidence,
FrequencyEvidence,
ResultCoverage,
TypeAliasCoverage,
CrossAuditFinding,
CrossAuditFindings,
DecompositionCost,
OptimizationCandidate,
AggregateProfile,
ProducerConsumerGraph,
P1_pass,
P2_pass,
P3_pass,
build_pcg,
CANONICAL_MEMORY_DIM,
MEMORY_DIM_FILE_HEURISTIC,
load_memory_dim_overrides,
file_origin_memory_dim,
classify_memory_dim,
WHOLE_STRUCT_KEY_THRESHOLD,
FIELD_BY_FIELD_KEY_THRESHOLD,
MIXED_DOMINANCE_THRESHOLD,
AGGREGATE_LEVEL_DOMINANCE_THRESHOLD,
is_whole_struct_access,
is_field_by_field_access,
is_hot_cold_split,
is_bulk_batched_access,
dominant_pattern,
detect_access_pattern,
detect_frequency_from_entry_point,
load_frequency_overrides,
estimate_call_frequency,
MICROSECOND_BUDGET_PER_LLM_TURN,
BRANCH_DISPATCH_OVERHEAD_US,
ALLOCATION_OVERHEAD_US,
DEAD_FIELD_COST_PER_FIELD_US,
COMPONENTIZATION_INDIRECTION_US,
UNIFICATION_INDIRECTION_US,
per_call_cost_us,
FREQUENCY_MULTIPLIER,
current_total_us,
componentize_factor,
unify_factor,
recommended_direction,
generate_rationale,
compute_decomposition_cost,
read_input_json,
INPUT_JSON_CONTRACTS,
find_enclosing_function,
compute_result_coverage,
compute_type_alias_coverage,
aggregate_cross_audit_findings,
run_all_cross_audit_reads,
DSL_WORD_ARITY_V2,
)
from src.result_types import Result, ErrorInfo, ErrorKind
# Phase 7 tests
def test_read_input_json_success() -> None:
"""read_input_json returns Result[dict] on success."""
with tempfile.TemporaryDirectory() as tmp:
p = Path(tmp) / "ok.json"
p.write_text(json.dumps({"findings": [{"file": "x.py", "line": 1}]}))
result = read_input_json(str(p))
assert result.ok
assert result.data == {"findings": [{"file": "x.py", "line": 1}]}
def test_read_input_json_missing_file() -> None:
"""read_input_json returns Result with ErrorInfo when the file is missing."""
result = read_input_json("/nonexistent/file.json")
assert not result.ok
assert len(result.errors) == 1
assert result.errors[0].kind == ErrorKind.NOT_FOUND
def test_read_input_json_malformed_json() -> None:
"""read_input_json returns Result with ErrorInfo when the JSON is malformed."""
with tempfile.TemporaryDirectory() as tmp:
p = Path(tmp) / "bad.json"
p.write_text("{invalid json")
result = read_input_json(str(p))
assert not result.ok
assert result.errors[0].kind == ErrorKind.INVALID_INPUT
def test_input_json_contracts_6_entries() -> None:
"""INPUT_JSON_CONTRACTS has 6 entries."""
assert len(INPUT_JSON_CONTRACTS) == 6
assert "audit_weak_types" in INPUT_JSON_CONTRACTS
assert "audit_exception_handling" in INPUT_JSON_CONTRACTS
assert "audit_optional_in_3_files" in INPUT_JSON_CONTRACTS
assert "audit_no_models_config_io" in INPUT_JSON_CONTRACTS
assert "audit_main_thread_imports" in INPUT_JSON_CONTRACTS
assert "type_registry" in INPUT_JSON_CONTRACTS
def test_find_enclosing_function_match() -> None:
"""find_enclosing_function returns the function ref whose (file, line) range contains the finding."""
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer")
refs = [
f,
FunctionRef(fqname="src.x.z", file="src/x.py", line=100, role="consumer"),
]
result = find_enclosing_function(file="src/x.py", line=15, function_refs=refs)
assert result is f
def test_find_enclosing_function_no_match() -> None:
"""find_enclosing_function returns None when no function contains the finding's (file, line)."""
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer")
refs = [f]
result = find_enclosing_function(file="src/y.py", line=15, function_refs=refs)
assert result is None
def test_compute_result_coverage_no_producers() -> None:
"""compute_result_coverage returns 0/0 when there are no producers."""
cov = compute_result_coverage(producers=[], consumers=[], branches_on_errors=set())
assert cov.total_producers == 0
assert cov.result_producers == 0
assert cov.total_consumers == 0
assert cov.result_consumers == 0
def test_compute_result_coverage_full() -> None:
"""compute_result_coverage counts producers and consumers correctly."""
f1 = FunctionRef(fqname="src.a", file="src/a.py", line=1, role="producer")
f2 = FunctionRef(fqname="src.b", file="src/b.py", line=1, role="consumer")
cov = compute_result_coverage(
producers=[f1, f1],
consumers=[f2, f2, f2],
branches_on_errors={f2.fqname},
)
assert cov.total_producers == 2
assert cov.result_producers == 2
assert cov.total_consumers == 3
assert cov.result_consumers == 1
assert "100%" in cov.summary
def test_compute_type_alias_coverage_no_sites() -> None:
"""compute_type_alias_coverage returns 0/0/0 when there are no sites."""
cov = compute_type_alias_coverage(total_sites=0, typed_sites=0)
assert cov.total_sites == 0
assert cov.typed_sites == 0
assert cov.untyped_sites == 0
def test_compute_type_alias_coverage_partial() -> None:
"""compute_type_alias_coverage computes untyped_sites = total - typed."""
cov = compute_type_alias_coverage(total_sites=45, typed_sites=38)
assert cov.total_sites == 45
assert cov.typed_sites == 38
assert cov.untyped_sites == 7
assert "84%" in cov.summary
assert "16%" in cov.summary
def test_aggregate_cross_audit_findings_empty() -> None:
"""aggregate_cross_audit_findings returns empty CrossAuditFindings for no findings."""
findings = aggregate_cross_audit_findings(
audit_name="audit_weak_types",
findings=[],
example_file="",
example_line=0,
)
assert findings.weak_types == ()
assert findings.exception_handling == ()
def test_aggregate_cross_audit_findings_one_audit() -> None:
"""aggregate_cross_audit_findings puts 5 findings into the right bucket."""
findings_list = [
{"file": "src/a.py", "line": 1},
{"file": "src/b.py", "line": 2},
{"file": "src/c.py", "line": 3},
{"file": "src/d.py", "line": 4},
{"file": "src/e.py", "line": 5},
]
findings = aggregate_cross_audit_findings(
audit_name="audit_weak_types",
findings=findings_list,
example_file="src/a.py",
example_line=1,
)
assert len(findings.weak_types) == 1
assert findings.weak_types[0].audit_script == "audit_weak_types"
assert findings.weak_types[0].site_count == 5
def test_run_all_cross_audit_reads_missing_dir() -> None:
"""run_all_cross_audit_reads returns empty dicts when the dir is missing."""
result = run_all_cross_audit_reads("/nonexistent/audit_inputs")
assert result == {}
def test_run_all_cross_audit_reads_partial() -> None:
"""run_all_cross_audit_reads returns the inputs that exist; missing inputs are empty dicts."""
with tempfile.TemporaryDirectory() as tmp:
(Path(tmp) / "audit_weak_types.json").write_text('{"findings": []}')
(Path(tmp) / "audit_exception_handling.json").write_text('{"findings": []}')
result = run_all_cross_audit_reads(tmp)
assert "audit_weak_types" in result
assert "audit_exception_handling" in result
# Phase 8 Task 8.1 test
def test_dsl_word_arity_v2_14_new_words() -> None:
"""DSL_WORD_ARITY_V2 has 14 new tagged words."""
expected_words = {
"kind", "mem-dim", "fn-ref", "access-pattern", "ap-evidence",
"frequency", "freq-evidence", "result-coverage", "type-alias-coverage",
"cross-audit-finding", "cross-audit-findings", "decomp-cost",
"opt-candidate", "is-candidate",
}
assert expected_words.issubset(set(DSL_WORD_ARITY_V2.keys()))
assert DSL_WORD_ARITY_V2["kind"] == 1
assert DSL_WORD_ARITY_V2["fn-ref"] == 4
assert DSL_WORD_ARITY_V2["decomp-cost"] == 8