e59334a303
Phase 7: read_input_json (stdlib I/O boundary), INPUT_JSON_CONTRACTS (6 input sources), find_enclosing_function (3-tier mapping tier 1), compute_result_coverage (cross-check of doeh), compute_type_alias_coverage (cross-check of dss), aggregate_cross_audit_findings (per-aggregate bucketing), run_all_cross_audit_reads (convenience). Phase 8 Task 8.1: DSL_WORD_ARITY_V2 (14 new tagged words). 15 new unit tests passing. 111 total tests passing. Phase 8 Tasks 8.2-8.5 (4 renderers + parser) next.
222 lines
7.6 KiB
Python
222 lines
7.6 KiB
Python
"""Tests for src.code_path_audit v2 - cross-audit integration + DSL."""
|
|
from __future__ import annotations
|
|
import ast
|
|
import textwrap
|
|
import tempfile
|
|
import json
|
|
from pathlib import Path
|
|
from collections import Counter
|
|
import pytest
|
|
from src.code_path_audit import (
|
|
AggregateKind,
|
|
MemoryDim,
|
|
AccessPattern,
|
|
Frequency,
|
|
RecommendedDirection,
|
|
FunctionRef,
|
|
AccessPatternEvidence,
|
|
FrequencyEvidence,
|
|
ResultCoverage,
|
|
TypeAliasCoverage,
|
|
CrossAuditFinding,
|
|
CrossAuditFindings,
|
|
DecompositionCost,
|
|
OptimizationCandidate,
|
|
AggregateProfile,
|
|
ProducerConsumerGraph,
|
|
P1_pass,
|
|
P2_pass,
|
|
P3_pass,
|
|
build_pcg,
|
|
CANONICAL_MEMORY_DIM,
|
|
MEMORY_DIM_FILE_HEURISTIC,
|
|
load_memory_dim_overrides,
|
|
file_origin_memory_dim,
|
|
classify_memory_dim,
|
|
WHOLE_STRUCT_KEY_THRESHOLD,
|
|
FIELD_BY_FIELD_KEY_THRESHOLD,
|
|
MIXED_DOMINANCE_THRESHOLD,
|
|
AGGREGATE_LEVEL_DOMINANCE_THRESHOLD,
|
|
is_whole_struct_access,
|
|
is_field_by_field_access,
|
|
is_hot_cold_split,
|
|
is_bulk_batched_access,
|
|
dominant_pattern,
|
|
detect_access_pattern,
|
|
detect_frequency_from_entry_point,
|
|
load_frequency_overrides,
|
|
estimate_call_frequency,
|
|
MICROSECOND_BUDGET_PER_LLM_TURN,
|
|
BRANCH_DISPATCH_OVERHEAD_US,
|
|
ALLOCATION_OVERHEAD_US,
|
|
DEAD_FIELD_COST_PER_FIELD_US,
|
|
COMPONENTIZATION_INDIRECTION_US,
|
|
UNIFICATION_INDIRECTION_US,
|
|
per_call_cost_us,
|
|
FREQUENCY_MULTIPLIER,
|
|
current_total_us,
|
|
componentize_factor,
|
|
unify_factor,
|
|
recommended_direction,
|
|
generate_rationale,
|
|
compute_decomposition_cost,
|
|
read_input_json,
|
|
INPUT_JSON_CONTRACTS,
|
|
find_enclosing_function,
|
|
compute_result_coverage,
|
|
compute_type_alias_coverage,
|
|
aggregate_cross_audit_findings,
|
|
run_all_cross_audit_reads,
|
|
DSL_WORD_ARITY_V2,
|
|
)
|
|
from src.result_types import Result, ErrorInfo, ErrorKind
|
|
|
|
# Phase 7 tests
|
|
def test_read_input_json_success() -> None:
|
|
"""read_input_json returns Result[dict] on success."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
p = Path(tmp) / "ok.json"
|
|
p.write_text(json.dumps({"findings": [{"file": "x.py", "line": 1}]}))
|
|
result = read_input_json(str(p))
|
|
assert result.ok
|
|
assert result.data == {"findings": [{"file": "x.py", "line": 1}]}
|
|
|
|
def test_read_input_json_missing_file() -> None:
|
|
"""read_input_json returns Result with ErrorInfo when the file is missing."""
|
|
result = read_input_json("/nonexistent/file.json")
|
|
assert not result.ok
|
|
assert len(result.errors) == 1
|
|
assert result.errors[0].kind == ErrorKind.NOT_FOUND
|
|
|
|
def test_read_input_json_malformed_json() -> None:
|
|
"""read_input_json returns Result with ErrorInfo when the JSON is malformed."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
p = Path(tmp) / "bad.json"
|
|
p.write_text("{invalid json")
|
|
result = read_input_json(str(p))
|
|
assert not result.ok
|
|
assert result.errors[0].kind == ErrorKind.INVALID_INPUT
|
|
|
|
def test_input_json_contracts_6_entries() -> None:
|
|
"""INPUT_JSON_CONTRACTS has 6 entries."""
|
|
assert len(INPUT_JSON_CONTRACTS) == 6
|
|
assert "audit_weak_types" in INPUT_JSON_CONTRACTS
|
|
assert "audit_exception_handling" in INPUT_JSON_CONTRACTS
|
|
assert "audit_optional_in_3_files" in INPUT_JSON_CONTRACTS
|
|
assert "audit_no_models_config_io" in INPUT_JSON_CONTRACTS
|
|
assert "audit_main_thread_imports" in INPUT_JSON_CONTRACTS
|
|
assert "type_registry" in INPUT_JSON_CONTRACTS
|
|
|
|
def test_find_enclosing_function_match() -> None:
|
|
"""find_enclosing_function returns the function ref whose (file, line) range contains the finding."""
|
|
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer")
|
|
refs = [
|
|
f,
|
|
FunctionRef(fqname="src.x.z", file="src/x.py", line=100, role="consumer"),
|
|
]
|
|
result = find_enclosing_function(file="src/x.py", line=15, function_refs=refs)
|
|
assert result is f
|
|
|
|
def test_find_enclosing_function_no_match() -> None:
|
|
"""find_enclosing_function returns None when no function contains the finding's (file, line)."""
|
|
f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer")
|
|
refs = [f]
|
|
result = find_enclosing_function(file="src/y.py", line=15, function_refs=refs)
|
|
assert result is None
|
|
|
|
def test_compute_result_coverage_no_producers() -> None:
|
|
"""compute_result_coverage returns 0/0 when there are no producers."""
|
|
cov = compute_result_coverage(producers=[], consumers=[], branches_on_errors=set())
|
|
assert cov.total_producers == 0
|
|
assert cov.result_producers == 0
|
|
assert cov.total_consumers == 0
|
|
assert cov.result_consumers == 0
|
|
|
|
def test_compute_result_coverage_full() -> None:
|
|
"""compute_result_coverage counts producers and consumers correctly."""
|
|
f1 = FunctionRef(fqname="src.a", file="src/a.py", line=1, role="producer")
|
|
f2 = FunctionRef(fqname="src.b", file="src/b.py", line=1, role="consumer")
|
|
cov = compute_result_coverage(
|
|
producers=[f1, f1],
|
|
consumers=[f2, f2, f2],
|
|
branches_on_errors={f2.fqname},
|
|
)
|
|
assert cov.total_producers == 2
|
|
assert cov.result_producers == 2
|
|
assert cov.total_consumers == 3
|
|
assert cov.result_consumers == 1
|
|
assert "100%" in cov.summary
|
|
|
|
def test_compute_type_alias_coverage_no_sites() -> None:
|
|
"""compute_type_alias_coverage returns 0/0/0 when there are no sites."""
|
|
cov = compute_type_alias_coverage(total_sites=0, typed_sites=0)
|
|
assert cov.total_sites == 0
|
|
assert cov.typed_sites == 0
|
|
assert cov.untyped_sites == 0
|
|
|
|
def test_compute_type_alias_coverage_partial() -> None:
|
|
"""compute_type_alias_coverage computes untyped_sites = total - typed."""
|
|
cov = compute_type_alias_coverage(total_sites=45, typed_sites=38)
|
|
assert cov.total_sites == 45
|
|
assert cov.typed_sites == 38
|
|
assert cov.untyped_sites == 7
|
|
assert "84%" in cov.summary
|
|
assert "16%" in cov.summary
|
|
|
|
def test_aggregate_cross_audit_findings_empty() -> None:
|
|
"""aggregate_cross_audit_findings returns empty CrossAuditFindings for no findings."""
|
|
findings = aggregate_cross_audit_findings(
|
|
audit_name="audit_weak_types",
|
|
findings=[],
|
|
example_file="",
|
|
example_line=0,
|
|
)
|
|
assert findings.weak_types == ()
|
|
assert findings.exception_handling == ()
|
|
|
|
def test_aggregate_cross_audit_findings_one_audit() -> None:
|
|
"""aggregate_cross_audit_findings puts 5 findings into the right bucket."""
|
|
findings_list = [
|
|
{"file": "src/a.py", "line": 1},
|
|
{"file": "src/b.py", "line": 2},
|
|
{"file": "src/c.py", "line": 3},
|
|
{"file": "src/d.py", "line": 4},
|
|
{"file": "src/e.py", "line": 5},
|
|
]
|
|
findings = aggregate_cross_audit_findings(
|
|
audit_name="audit_weak_types",
|
|
findings=findings_list,
|
|
example_file="src/a.py",
|
|
example_line=1,
|
|
)
|
|
assert len(findings.weak_types) == 1
|
|
assert findings.weak_types[0].audit_script == "audit_weak_types"
|
|
assert findings.weak_types[0].site_count == 5
|
|
|
|
def test_run_all_cross_audit_reads_missing_dir() -> None:
|
|
"""run_all_cross_audit_reads returns empty dicts when the dir is missing."""
|
|
result = run_all_cross_audit_reads("/nonexistent/audit_inputs")
|
|
assert result == {}
|
|
|
|
def test_run_all_cross_audit_reads_partial() -> None:
|
|
"""run_all_cross_audit_reads returns the inputs that exist; missing inputs are empty dicts."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
(Path(tmp) / "audit_weak_types.json").write_text('{"findings": []}')
|
|
(Path(tmp) / "audit_exception_handling.json").write_text('{"findings": []}')
|
|
result = run_all_cross_audit_reads(tmp)
|
|
assert "audit_weak_types" in result
|
|
assert "audit_exception_handling" in result
|
|
|
|
# Phase 8 Task 8.1 test
|
|
def test_dsl_word_arity_v2_14_new_words() -> None:
|
|
"""DSL_WORD_ARITY_V2 has 14 new tagged words."""
|
|
expected_words = {
|
|
"kind", "mem-dim", "fn-ref", "access-pattern", "ap-evidence",
|
|
"frequency", "freq-evidence", "result-coverage", "type-alias-coverage",
|
|
"cross-audit-finding", "cross-audit-findings", "decomp-cost",
|
|
"opt-candidate", "is-candidate",
|
|
}
|
|
assert expected_words.issubset(set(DSL_WORD_ARITY_V2.keys()))
|
|
assert DSL_WORD_ARITY_V2["kind"] == 1
|
|
assert DSL_WORD_ARITY_V2["fn-ref"] == 4
|
|
assert DSL_WORD_ARITY_V2["decomp-cost"] == 8 |