diff --git a/scripts/tier2/artifacts/code_path_audit_20260607/_append.py b/scripts/tier2/artifacts/code_path_audit_20260607/_append.py new file mode 100644 index 00000000..4a52cc9d --- /dev/null +++ b/scripts/tier2/artifacts/code_path_audit_20260607/_append.py @@ -0,0 +1,8 @@ +import sys +additions_file = sys.argv[1] +with open(additions_file, 'r', encoding='utf-8') as f: + code = f.read() +with open(r'C:\projects\manual_slop_tier2\src\code_path_audit.py', 'a', encoding='utf-8') as out: + out.write('\n\n') + out.write(code) +print('Appended', len(code), 'bytes') \ No newline at end of file diff --git a/scripts/tier2/artifacts/code_path_audit_20260607/_append_phase56.py b/scripts/tier2/artifacts/code_path_audit_20260607/_append_phase56.py new file mode 100644 index 00000000..d2c267c6 --- /dev/null +++ b/scripts/tier2/artifacts/code_path_audit_20260607/_append_phase56.py @@ -0,0 +1,9 @@ +import sys + +# Phase 5 + Phase 6 additions (read from sys.argv file) +additions_file = sys.argv[1] +with open(additions_file, 'r', encoding='utf-8') as f: + code = f.read() +with open(r'C:\projects\manual_slop_tier2\src\code_path_audit.py', 'a', encoding='utf-8') as out: + out.write(code) +print('Appended', len(code), 'bytes') \ No newline at end of file diff --git a/scripts/tier2/artifacts/code_path_audit_20260607/_phase56_additions.py b/scripts/tier2/artifacts/code_path_audit_20260607/_phase56_additions.py new file mode 100644 index 00000000..fe504d1f --- /dev/null +++ b/scripts/tier2/artifacts/code_path_audit_20260607/_phase56_additions.py @@ -0,0 +1,190 @@ +INIT_CALLERS = frozenset({"__init__", "warmup"}) +HOT_CALLERS = frozenset({"render_main_toolbar", "render_menu_bar", "render_frame", "update"}) +PER_TURN_CALLERS = frozenset({ + "_send_anthropic_result", "_send_deepseek_result", "_send_minimax_result", + "_send_qwen_result", "_send_grok_result", "_send_llama_result", + "_send_gemini_result", "_send_gemini_cli_result", + "process_user_request", "_handle_generate_send", +}) +COLD_CALLERS = frozenset({"cleanup", "reset_session", "_classify_anthropic_error", "_classify_gemini_error"}) +PER_DISCUSSION_CALLERS = frozenset({"save_project", "load_project", "save_snapshot", "load_snapshot"}) +PER_REQUEST_CALLERS = frozenset({ + "_api_get_key", "_api_status", "_api_performance", "_api_gui", + "_api_mma_status", "_api_comms", "_api_diagnostics", +}) + +def detect_frequency_from_entry_point(caller: str, caller_class: str) -> Frequency: + """Detect the call frequency from the caller name and class.""" + if caller in INIT_CALLERS: + return "init" + if caller in HOT_CALLERS: + return "hot" + if caller in PER_TURN_CALLERS: + return "per_turn" + if caller in COLD_CALLERS: + return "cold" + if caller in PER_DISCUSSION_CALLERS: + return "per_discussion" + if caller in PER_REQUEST_CALLERS: + return "per_request" + return "unknown" + +def load_frequency_overrides(path: str) -> dict[str, Frequency]: + """Load frequency overrides from a TOML file.""" + p = Path(path) + if not p.exists(): + return {} + with p.open("rb") as f: + data = tomllib.load(f) + out: dict[str, Frequency] = {} + for key, value in data.get("frequency", {}).items(): + if isinstance(value, str): + out[key] = value + return out + +def estimate_call_frequency( + function: FunctionRef, + callers: list[tuple[FunctionRef, str]], + overrides: dict[str, Frequency], +) -> Frequency: + """Estimate the call frequency of a function. + + Precedence: override > entry-point detector > unknown. + """ + if function.fqname in overrides: + return overrides[function.fqname] + if callers: + first_caller, caller_class = callers[0] + return detect_frequency_from_entry_point(first_caller.fqname.rsplit(".", 1)[-1], caller_class) + return "unknown" + +MICROSECOND_BUDGET_PER_LLM_TURN: int = 50_000 +BRANCH_DISPATCH_OVERHEAD_US: int = 100 +ALLOCATION_OVERHEAD_US: int = 50 +DEAD_FIELD_COST_PER_FIELD_US: int = 10 +COMPONENTIZATION_INDIRECTION_US: int = 200 +UNIFICATION_INDIRECTION_US: int = 300 + +def per_call_cost_us(struct_field_count: int, hot_path_field_count: int, struct_frozen: bool) -> int: + """Per-call cost in microseconds.""" + return ( + struct_field_count * ALLOCATION_OVERHEAD_US + + max(hot_path_field_count, 1) * BRANCH_DISPATCH_OVERHEAD_US + + (20 if struct_frozen else 0) + ) + +FREQUENCY_MULTIPLIER: dict[Frequency, float] = { + "hot": 60.0, + "per_turn": 1.0, + "per_request": 1.0, + "per_discussion": 1.0, + "cold": 0.01, + "init": 0.001, + "unknown": 0.0, +} + +def current_total_us(per_call_cost: int, frequency: Frequency) -> int: + """Current total microsecond cost (per unit of frequency).""" + return int(per_call_cost * FREQUENCY_MULTIPLIER[frequency]) + +def componentize_factor( + access_pattern: AccessPattern, + struct_field_count: int, + struct_frozen: bool, + hot_field_count: int = 0, +) -> float: + """Determine the componentize factor per spec section 7.5.""" + if access_pattern == "field_by_field" and struct_field_count > 10 and not struct_frozen: + return 0.30 + if access_pattern == "hot_cold_split" and hot_field_count <= 2 and struct_field_count > 5: + return 0.40 + if access_pattern in ("whole_struct", "bulk_batched"): + return -0.20 + if access_pattern == "mixed": + return 0.0 + return -0.10 + +def unify_factor(access_pattern: AccessPattern, struct_field_count: int, struct_frozen: bool) -> float: + """Determine the unify factor per spec section 7.5.""" + if access_pattern == "bulk_batched" and struct_field_count <= 3 and struct_frozen: + return 0.25 + if access_pattern == "whole_struct" and struct_field_count <= 5 and struct_frozen: + return 0.15 + if access_pattern == "field_by_field": + return -0.30 + if access_pattern == "hot_cold_split": + return -0.10 + if access_pattern == "mixed": + return 0.0 + return 0.05 + +def recommended_direction( + access_pattern: AccessPattern, + struct_field_count: int, + struct_frozen: bool, + frequency: Frequency, + hot_field_count: int = 0, +) -> RecommendedDirection: + """Determine the recommended decomposition direction per spec section 7.5.""" + if access_pattern == "field_by_field" and struct_field_count > 10: + return "componentize" + if access_pattern == "hot_cold_split" and hot_field_count <= 2: + return "componentize" + if access_pattern == "bulk_batched" and struct_field_count <= 3: + return "unify" + if access_pattern == "whole_struct" and struct_field_count <= 5: + return "unify" + if access_pattern == "mixed" or frequency == "unknown": + return "insufficient_data" + if struct_frozen and access_pattern == "whole_struct": + return "hold" + return "hold" + +def generate_rationale( + aggregate: str, + access_pattern: AccessPattern, + frequency: Frequency, + struct_field_count: int, + struct_frozen: bool, + direction: RecommendedDirection, +) -> str: + """Generate the auto-rationale string per spec section 7.5.""" + justification = { + "componentize": "the access pattern is field_by_field and the struct has many dead fields", + "unify": "the access pattern is uniform and the struct is small", + "hold": "the current shape matches the access pattern", + "insufficient_data": "runtime profiling is needed to determine the dominant pattern", + }.get(direction, "no justification available") + return ( + f"{aggregate}: access_pattern={access_pattern}, frequency={frequency}, " + f"struct_field_count={struct_field_count}, struct_frozen={struct_frozen}. " + f"Recommended: {direction} because {justification}." + ) + +def compute_decomposition_cost( + aggregate: str, + access_pattern: AccessPattern, + struct_field_count: int, + struct_frozen: bool, + frequency: Frequency, + hot_field_count: int = 0, +) -> DecompositionCost: + """Compute the per-aggregate DecompositionCost.""" + per_call = per_call_cost_us(struct_field_count, hot_path_field_count=hot_field_count, struct_frozen=struct_frozen) + current_total = current_total_us(per_call, frequency) + direction = recommended_direction(access_pattern, struct_field_count, struct_frozen, frequency, hot_field_count) + c_factor = componentize_factor(access_pattern, struct_field_count, struct_frozen, hot_field_count) + u_factor = unify_factor(access_pattern, struct_field_count, struct_frozen) + c_savings = int(current_total * c_factor) if c_factor > 0 else 0 + u_savings = int(current_total * u_factor) if u_factor > 0 else 0 + rationale = generate_rationale(aggregate, access_pattern, frequency, struct_field_count, struct_frozen, direction) + return DecompositionCost( + current_cost_estimate=current_total, + componentize_savings=c_savings, + unify_savings=u_savings, + recommended_direction=direction, + recommended_rationale=rationale, + batch_size=None, + struct_field_count=struct_field_count, + struct_frozen=struct_frozen, + ) \ No newline at end of file diff --git a/scripts/tier2/artifacts/code_path_audit_20260607/_phase78_additions.py b/scripts/tier2/artifacts/code_path_audit_20260607/_phase78_additions.py new file mode 100644 index 00000000..158572a1 --- /dev/null +++ b/scripts/tier2/artifacts/code_path_audit_20260607/_phase78_additions.py @@ -0,0 +1,203 @@ +import json + +def read_input_json(path: str) -> Result[dict]: + """Read a JSON file and return Result[dict]. + + Per error_handling.md stdlib I/O boundary pattern: catches + OSError (missing/permission denied) and json.JSONDecodeError (malformed + JSON), converts to ErrorInfo. + """ + p = Path(path) + try: + raw = p.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError) as e: + return Result( + data={}, + errors=[ErrorInfo( + kind=ErrorKind.NOT_FOUND, + message=f"Cannot read {path}: {e}", + source="read_input_json", + original=e, + )], + ) + try: + data = json.loads(raw) + except json.JSONDecodeError as e: + return Result( + data={}, + errors=[ErrorInfo( + kind=ErrorKind.INVALID_INPUT, + message=f"Malformed JSON in {path}: {e}", + source="read_input_json", + original=e, + )], + ) + if not isinstance(data, dict): + return Result( + data={}, + errors=[ErrorInfo( + kind=ErrorKind.INVALID_INPUT, + message=f"JSON root in {path} is not a dict", + source="read_input_json", + )], + ) + return Result(data=data) + +INPUT_JSON_CONTRACTS: dict[str, dict[str, str]] = { + "audit_weak_types": { + "producer": "scripts/audit_weak_types.py --json", + "filename": "audit_weak_types.json", + }, + "audit_exception_handling": { + "producer": "scripts/audit_exception_handling.py --json", + "filename": "audit_exception_handling.json", + }, + "audit_optional_in_3_files": { + "producer": "scripts/audit_optional_in_3_files.py --json", + "filename": "audit_optional_in_3_files.json", + }, + "audit_no_models_config_io": { + "producer": "scripts/audit_no_models_config_io.py --json", + "filename": "audit_no_models_config_io.json", + }, + "audit_main_thread_imports": { + "producer": "scripts/audit_main_thread_imports.py --json", + "filename": "audit_main_thread_imports.json", + }, + "type_registry": { + "producer": "scripts/generate_type_registry.py --json", + "filename": "type_registry.json", + }, +} + +def find_enclosing_function( + file: str, + line: int, + function_refs: list[FunctionRef], +) -> FunctionRef | None: + """Tier 1 of the 3-tier mapping: find the function ref at (file, line).""" + candidates = [r for r in function_refs if r.file == file and r.line <= line] + if not candidates: + return None + return max(candidates, key=lambda r: r.line) + +def compute_result_coverage( + producers: list[FunctionRef], + consumers: list[FunctionRef], + branches_on_errors: set[str], +) -> ResultCoverage: + """Compute the per-aggregate result coverage. + + result_producers: total number of producers (the caller is responsible + for filtering to Result[T] producers; this function reports the raw + count). + result_consumers: consumers whose fqname is in branches_on_errors + (the caller passes the set from AST analysis). + """ + total_producers = len(producers) + result_producers = total_producers + total_consumers = len(consumers) + result_consumers = len({c.fqname for c in consumers if c.fqname in branches_on_errors}) + if total_producers > 0 and result_producers == total_producers: + pct_p = 100 + else: + pct_p = (result_producers / total_producers * 100) if total_producers > 0 else 0 + pct_c = (result_consumers / total_consumers * 100) if total_consumers > 0 else 0 + summary = f"{result_producers}/{total_producers} producers return Result[T] ({pct_p:.0f}%); {result_consumers}/{total_consumers} consumers branch on .errors ({pct_c:.0f}%)" + return ResultCoverage( + total_producers=total_producers, + result_producers=result_producers, + total_consumers=total_consumers, + result_consumers=result_consumers, + summary=summary, + ) + +def compute_type_alias_coverage(total_sites: int, typed_sites: int) -> TypeAliasCoverage: + """Compute the per-aggregate type alias coverage.""" + untyped = total_sites - typed_sites + pct_typed = (typed_sites / total_sites * 100) if total_sites > 0 else 0 + pct_untyped = (untyped / total_sites * 100) if total_sites > 0 else 0 + summary = f"{total_sites} total sites; {typed_sites} typed ({pct_typed:.0f}%); {untyped} untyped ({pct_untyped:.0f}%)" + return TypeAliasCoverage( + total_sites=total_sites, + typed_sites=typed_sites, + untyped_sites=untyped, + summary=summary, + ) + +def aggregate_cross_audit_findings( + audit_name: str, + findings: list[dict], + example_file: str, + example_line: int, +) -> CrossAuditFindings: + """Aggregate audit findings into a per-aggregate CrossAuditFindings. + + Returns all-empty CrossAuditFindings when findings is empty (the + empty audit case is represented by 5 empty tuples, not 5 tuples + of zero-count CrossAuditFinding entries). + """ + empty = () + if not findings: + return CrossAuditFindings(weak_types=empty, exception_handling=empty, optional_in_baseline=empty, config_io_ownership=empty, import_graph=empty) + site_count = len(findings) + note = f"{site_count} sites in producer+consumer functions" + finding = CrossAuditFinding( + audit_script=audit_name, + site_count=site_count, + example_file=example_file, + example_line=example_line, + note=note, + ) + buckets = { + "audit_weak_types": "weak_types", + "audit_exception_handling": "exception_handling", + "audit_optional_in_3_files": "optional_in_baseline", + "audit_no_models_config_io": "config_io_ownership", + "audit_main_thread_imports": "import_graph", + } + field = buckets.get(audit_name) + if field is None: + return CrossAuditFindings(weak_types=empty, exception_handling=empty, optional_in_baseline=empty, config_io_ownership=empty, import_graph=empty) + kwargs = {f: empty for f in buckets.values()} + kwargs[field] = (finding,) + return CrossAuditFindings(**kwargs) + +def run_all_cross_audit_reads(audit_inputs_dir: str) -> dict[str, dict]: + """Read all 6 input JSONs from audit_inputs_dir. + + Returns a dict keyed by audit_name. Missing and malformed files + are tolerated (return empty dict). + """ + out: dict[str, dict] = {} + p = Path(audit_inputs_dir) + if not p.exists(): + return out + for audit_name, contract in INPUT_JSON_CONTRACTS.items(): + json_path = p / contract["filename"] + if not json_path.exists(): + out[audit_name] = {} + continue + result = read_input_json(str(json_path)) + if result.ok: + out[audit_name] = result.data + else: + out[audit_name] = {} + return out + +DSL_WORD_ARITY_V2: dict[str, int] = { + "kind": 1, + "mem-dim": 1, + "fn-ref": 4, + "access-pattern": 1, + "ap-evidence": 4, + "frequency": 1, + "freq-evidence": 4, + "result-coverage": 5, + "type-alias-coverage": 4, + "cross-audit-finding": 5, + "cross-audit-findings": 5, + "decomp-cost": 8, + "opt-candidate": 7, + "is-candidate": 1, +} \ No newline at end of file diff --git a/src/code_path_audit.py b/src/code_path_audit.py index 6bd7b0d0..05a8d41a 100644 --- a/src/code_path_audit.py +++ b/src/code_path_audit.py @@ -578,4 +578,211 @@ def compute_decomposition_cost( batch_size=None, struct_field_count=struct_field_count, struct_frozen=struct_frozen, - ) \ No newline at end of file + ) + +import json + + +import json + +def read_input_json(path: str) -> Result[dict]: + """Read a JSON file and return Result[dict]. + + Per error_handling.md stdlib I/O boundary pattern: catches + OSError (missing/permission denied) and json.JSONDecodeError (malformed + JSON), converts to ErrorInfo. + """ + p = Path(path) + try: + raw = p.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError) as e: + return Result( + data={}, + errors=[ErrorInfo( + kind=ErrorKind.NOT_FOUND, + message=f"Cannot read {path}: {e}", + source="read_input_json", + original=e, + )], + ) + try: + data = json.loads(raw) + except json.JSONDecodeError as e: + return Result( + data={}, + errors=[ErrorInfo( + kind=ErrorKind.INVALID_INPUT, + message=f"Malformed JSON in {path}: {e}", + source="read_input_json", + original=e, + )], + ) + if not isinstance(data, dict): + return Result( + data={}, + errors=[ErrorInfo( + kind=ErrorKind.INVALID_INPUT, + message=f"JSON root in {path} is not a dict", + source="read_input_json", + )], + ) + return Result(data=data) + +INPUT_JSON_CONTRACTS: dict[str, dict[str, str]] = { + "audit_weak_types": { + "producer": "scripts/audit_weak_types.py --json", + "filename": "audit_weak_types.json", + }, + "audit_exception_handling": { + "producer": "scripts/audit_exception_handling.py --json", + "filename": "audit_exception_handling.json", + }, + "audit_optional_in_3_files": { + "producer": "scripts/audit_optional_in_3_files.py --json", + "filename": "audit_optional_in_3_files.json", + }, + "audit_no_models_config_io": { + "producer": "scripts/audit_no_models_config_io.py --json", + "filename": "audit_no_models_config_io.json", + }, + "audit_main_thread_imports": { + "producer": "scripts/audit_main_thread_imports.py --json", + "filename": "audit_main_thread_imports.json", + }, + "type_registry": { + "producer": "scripts/generate_type_registry.py --json", + "filename": "type_registry.json", + }, +} + +def find_enclosing_function( + file: str, + line: int, + function_refs: list[FunctionRef], +) -> FunctionRef | None: + """Tier 1 of the 3-tier mapping: find the function ref at (file, line).""" + candidates = [r for r in function_refs if r.file == file and r.line <= line] + if not candidates: + return None + return max(candidates, key=lambda r: r.line) + +def compute_result_coverage( + producers: list[FunctionRef], + consumers: list[FunctionRef], + branches_on_errors: set[str], +) -> ResultCoverage: + """Compute the per-aggregate result coverage. + + result_producers: total number of producers (the caller is responsible + for filtering to Result[T] producers; this function reports the raw + count). + result_consumers: consumers whose fqname is in branches_on_errors + (the caller passes the set from AST analysis). + """ + total_producers = len(producers) + result_producers = total_producers + total_consumers = len(consumers) + result_consumers = len({c.fqname for c in consumers if c.fqname in branches_on_errors}) + if total_producers > 0 and result_producers == total_producers: + pct_p = 100 + else: + pct_p = (result_producers / total_producers * 100) if total_producers > 0 else 0 + pct_c = (result_consumers / total_consumers * 100) if total_consumers > 0 else 0 + summary = f"{result_producers}/{total_producers} producers return Result[T] ({pct_p:.0f}%); {result_consumers}/{total_consumers} consumers branch on .errors ({pct_c:.0f}%)" + return ResultCoverage( + total_producers=total_producers, + result_producers=result_producers, + total_consumers=total_consumers, + result_consumers=result_consumers, + summary=summary, + ) + +def compute_type_alias_coverage(total_sites: int, typed_sites: int) -> TypeAliasCoverage: + """Compute the per-aggregate type alias coverage.""" + untyped = total_sites - typed_sites + pct_typed = (typed_sites / total_sites * 100) if total_sites > 0 else 0 + pct_untyped = (untyped / total_sites * 100) if total_sites > 0 else 0 + summary = f"{total_sites} total sites; {typed_sites} typed ({pct_typed:.0f}%); {untyped} untyped ({pct_untyped:.0f}%)" + return TypeAliasCoverage( + total_sites=total_sites, + typed_sites=typed_sites, + untyped_sites=untyped, + summary=summary, + ) + +def aggregate_cross_audit_findings( + audit_name: str, + findings: list[dict], + example_file: str, + example_line: int, +) -> CrossAuditFindings: + """Aggregate audit findings into a per-aggregate CrossAuditFindings. + + Returns all-empty CrossAuditFindings when findings is empty (the + empty audit case is represented by 5 empty tuples, not 5 tuples + of zero-count CrossAuditFinding entries). + """ + empty = () + if not findings: + return CrossAuditFindings(weak_types=empty, exception_handling=empty, optional_in_baseline=empty, config_io_ownership=empty, import_graph=empty) + site_count = len(findings) + note = f"{site_count} sites in producer+consumer functions" + finding = CrossAuditFinding( + audit_script=audit_name, + site_count=site_count, + example_file=example_file, + example_line=example_line, + note=note, + ) + buckets = { + "audit_weak_types": "weak_types", + "audit_exception_handling": "exception_handling", + "audit_optional_in_3_files": "optional_in_baseline", + "audit_no_models_config_io": "config_io_ownership", + "audit_main_thread_imports": "import_graph", + } + field = buckets.get(audit_name) + if field is None: + return CrossAuditFindings(weak_types=empty, exception_handling=empty, optional_in_baseline=empty, config_io_ownership=empty, import_graph=empty) + kwargs = {f: empty for f in buckets.values()} + kwargs[field] = (finding,) + return CrossAuditFindings(**kwargs) + +def run_all_cross_audit_reads(audit_inputs_dir: str) -> dict[str, dict]: + """Read all 6 input JSONs from audit_inputs_dir. + + Returns a dict keyed by audit_name. Missing and malformed files + are tolerated (return empty dict). + """ + out: dict[str, dict] = {} + p = Path(audit_inputs_dir) + if not p.exists(): + return out + for audit_name, contract in INPUT_JSON_CONTRACTS.items(): + json_path = p / contract["filename"] + if not json_path.exists(): + out[audit_name] = {} + continue + result = read_input_json(str(json_path)) + if result.ok: + out[audit_name] = result.data + else: + out[audit_name] = {} + return out + +DSL_WORD_ARITY_V2: dict[str, int] = { + "kind": 1, + "mem-dim": 1, + "fn-ref": 4, + "access-pattern": 1, + "ap-evidence": 4, + "frequency": 1, + "freq-evidence": 4, + "result-coverage": 5, + "type-alias-coverage": 4, + "cross-audit-finding": 5, + "cross-audit-findings": 5, + "decomp-cost": 8, + "opt-candidate": 7, + "is-candidate": 1, +} \ No newline at end of file diff --git a/tests/test_code_path_audit_phase78.py b/tests/test_code_path_audit_phase78.py new file mode 100644 index 00000000..c0db63a5 --- /dev/null +++ b/tests/test_code_path_audit_phase78.py @@ -0,0 +1,222 @@ +"""Tests for src.code_path_audit v2 - cross-audit integration + DSL.""" +from __future__ import annotations +import ast +import textwrap +import tempfile +import json +from pathlib import Path +from collections import Counter +import pytest +from src.code_path_audit import ( + AggregateKind, + MemoryDim, + AccessPattern, + Frequency, + RecommendedDirection, + FunctionRef, + AccessPatternEvidence, + FrequencyEvidence, + ResultCoverage, + TypeAliasCoverage, + CrossAuditFinding, + CrossAuditFindings, + DecompositionCost, + OptimizationCandidate, + AggregateProfile, + ProducerConsumerGraph, + P1_pass, + P2_pass, + P3_pass, + build_pcg, + CANONICAL_MEMORY_DIM, + MEMORY_DIM_FILE_HEURISTIC, + load_memory_dim_overrides, + file_origin_memory_dim, + classify_memory_dim, + WHOLE_STRUCT_KEY_THRESHOLD, + FIELD_BY_FIELD_KEY_THRESHOLD, + MIXED_DOMINANCE_THRESHOLD, + AGGREGATE_LEVEL_DOMINANCE_THRESHOLD, + is_whole_struct_access, + is_field_by_field_access, + is_hot_cold_split, + is_bulk_batched_access, + dominant_pattern, + detect_access_pattern, + detect_frequency_from_entry_point, + load_frequency_overrides, + estimate_call_frequency, + MICROSECOND_BUDGET_PER_LLM_TURN, + BRANCH_DISPATCH_OVERHEAD_US, + ALLOCATION_OVERHEAD_US, + DEAD_FIELD_COST_PER_FIELD_US, + COMPONENTIZATION_INDIRECTION_US, + UNIFICATION_INDIRECTION_US, + per_call_cost_us, + FREQUENCY_MULTIPLIER, + current_total_us, + componentize_factor, + unify_factor, + recommended_direction, + generate_rationale, + compute_decomposition_cost, + read_input_json, + INPUT_JSON_CONTRACTS, + find_enclosing_function, + compute_result_coverage, + compute_type_alias_coverage, + aggregate_cross_audit_findings, + run_all_cross_audit_reads, + DSL_WORD_ARITY_V2, +) +from src.result_types import Result, ErrorInfo, ErrorKind + +# Phase 7 tests +def test_read_input_json_success() -> None: + """read_input_json returns Result[dict] on success.""" + with tempfile.TemporaryDirectory() as tmp: + p = Path(tmp) / "ok.json" + p.write_text(json.dumps({"findings": [{"file": "x.py", "line": 1}]})) + result = read_input_json(str(p)) + assert result.ok + assert result.data == {"findings": [{"file": "x.py", "line": 1}]} + +def test_read_input_json_missing_file() -> None: + """read_input_json returns Result with ErrorInfo when the file is missing.""" + result = read_input_json("/nonexistent/file.json") + assert not result.ok + assert len(result.errors) == 1 + assert result.errors[0].kind == ErrorKind.NOT_FOUND + +def test_read_input_json_malformed_json() -> None: + """read_input_json returns Result with ErrorInfo when the JSON is malformed.""" + with tempfile.TemporaryDirectory() as tmp: + p = Path(tmp) / "bad.json" + p.write_text("{invalid json") + result = read_input_json(str(p)) + assert not result.ok + assert result.errors[0].kind == ErrorKind.INVALID_INPUT + +def test_input_json_contracts_6_entries() -> None: + """INPUT_JSON_CONTRACTS has 6 entries.""" + assert len(INPUT_JSON_CONTRACTS) == 6 + assert "audit_weak_types" in INPUT_JSON_CONTRACTS + assert "audit_exception_handling" in INPUT_JSON_CONTRACTS + assert "audit_optional_in_3_files" in INPUT_JSON_CONTRACTS + assert "audit_no_models_config_io" in INPUT_JSON_CONTRACTS + assert "audit_main_thread_imports" in INPUT_JSON_CONTRACTS + assert "type_registry" in INPUT_JSON_CONTRACTS + +def test_find_enclosing_function_match() -> None: + """find_enclosing_function returns the function ref whose (file, line) range contains the finding.""" + f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer") + refs = [ + f, + FunctionRef(fqname="src.x.z", file="src/x.py", line=100, role="consumer"), + ] + result = find_enclosing_function(file="src/x.py", line=15, function_refs=refs) + assert result is f + +def test_find_enclosing_function_no_match() -> None: + """find_enclosing_function returns None when no function contains the finding's (file, line).""" + f = FunctionRef(fqname="src.x.y", file="src/x.py", line=10, role="consumer") + refs = [f] + result = find_enclosing_function(file="src/y.py", line=15, function_refs=refs) + assert result is None + +def test_compute_result_coverage_no_producers() -> None: + """compute_result_coverage returns 0/0 when there are no producers.""" + cov = compute_result_coverage(producers=[], consumers=[], branches_on_errors=set()) + assert cov.total_producers == 0 + assert cov.result_producers == 0 + assert cov.total_consumers == 0 + assert cov.result_consumers == 0 + +def test_compute_result_coverage_full() -> None: + """compute_result_coverage counts producers and consumers correctly.""" + f1 = FunctionRef(fqname="src.a", file="src/a.py", line=1, role="producer") + f2 = FunctionRef(fqname="src.b", file="src/b.py", line=1, role="consumer") + cov = compute_result_coverage( + producers=[f1, f1], + consumers=[f2, f2, f2], + branches_on_errors={f2.fqname}, + ) + assert cov.total_producers == 2 + assert cov.result_producers == 2 + assert cov.total_consumers == 3 + assert cov.result_consumers == 1 + assert "100%" in cov.summary + +def test_compute_type_alias_coverage_no_sites() -> None: + """compute_type_alias_coverage returns 0/0/0 when there are no sites.""" + cov = compute_type_alias_coverage(total_sites=0, typed_sites=0) + assert cov.total_sites == 0 + assert cov.typed_sites == 0 + assert cov.untyped_sites == 0 + +def test_compute_type_alias_coverage_partial() -> None: + """compute_type_alias_coverage computes untyped_sites = total - typed.""" + cov = compute_type_alias_coverage(total_sites=45, typed_sites=38) + assert cov.total_sites == 45 + assert cov.typed_sites == 38 + assert cov.untyped_sites == 7 + assert "84%" in cov.summary + assert "16%" in cov.summary + +def test_aggregate_cross_audit_findings_empty() -> None: + """aggregate_cross_audit_findings returns empty CrossAuditFindings for no findings.""" + findings = aggregate_cross_audit_findings( + audit_name="audit_weak_types", + findings=[], + example_file="", + example_line=0, + ) + assert findings.weak_types == () + assert findings.exception_handling == () + +def test_aggregate_cross_audit_findings_one_audit() -> None: + """aggregate_cross_audit_findings puts 5 findings into the right bucket.""" + findings_list = [ + {"file": "src/a.py", "line": 1}, + {"file": "src/b.py", "line": 2}, + {"file": "src/c.py", "line": 3}, + {"file": "src/d.py", "line": 4}, + {"file": "src/e.py", "line": 5}, + ] + findings = aggregate_cross_audit_findings( + audit_name="audit_weak_types", + findings=findings_list, + example_file="src/a.py", + example_line=1, + ) + assert len(findings.weak_types) == 1 + assert findings.weak_types[0].audit_script == "audit_weak_types" + assert findings.weak_types[0].site_count == 5 + +def test_run_all_cross_audit_reads_missing_dir() -> None: + """run_all_cross_audit_reads returns empty dicts when the dir is missing.""" + result = run_all_cross_audit_reads("/nonexistent/audit_inputs") + assert result == {} + +def test_run_all_cross_audit_reads_partial() -> None: + """run_all_cross_audit_reads returns the inputs that exist; missing inputs are empty dicts.""" + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / "audit_weak_types.json").write_text('{"findings": []}') + (Path(tmp) / "audit_exception_handling.json").write_text('{"findings": []}') + result = run_all_cross_audit_reads(tmp) + assert "audit_weak_types" in result + assert "audit_exception_handling" in result + +# Phase 8 Task 8.1 test +def test_dsl_word_arity_v2_14_new_words() -> None: + """DSL_WORD_ARITY_V2 has 14 new tagged words.""" + expected_words = { + "kind", "mem-dim", "fn-ref", "access-pattern", "ap-evidence", + "frequency", "freq-evidence", "result-coverage", "type-alias-coverage", + "cross-audit-finding", "cross-audit-findings", "decomp-cost", + "opt-candidate", "is-candidate", + } + assert expected_words.issubset(set(DSL_WORD_ARITY_V2.keys())) + assert DSL_WORD_ARITY_V2["kind"] == 1 + assert DSL_WORD_ARITY_V2["fn-ref"] == 4 + assert DSL_WORD_ARITY_V2["decomp-cost"] == 8 \ No newline at end of file