diff --git a/src/code_path_audit.py b/src/code_path_audit.py index d925f8a9..6bd7b0d0 100644 --- a/src/code_path_audit.py +++ b/src/code_path_audit.py @@ -377,7 +377,7 @@ def detect_access_pattern( ) -> AccessPattern: """Detect the per-function access pattern. - Precedence: whole_struct > hot_cold_split > field_by_field > mixed. +Precedence: whole_struct > hot_cold_split > field_by_field > mixed. """ if is_whole_struct_access(field_counts, has_direct_access): return "whole_struct" @@ -386,4 +386,196 @@ def detect_access_pattern( return "hot_cold_split" if is_field_by_field_access(field_counts): return "field_by_field" - return "mixed" \ No newline at end of file + return "mixed" + +INIT_CALLERS = frozenset({"__init__", "warmup"}) +HOT_CALLERS = frozenset({"render_main_toolbar", "render_menu_bar", "render_frame", "update"}) +PER_TURN_CALLERS = frozenset({ + "_send_anthropic_result", "_send_deepseek_result", "_send_minimax_result", + "_send_qwen_result", "_send_grok_result", "_send_llama_result", + "_send_gemini_result", "_send_gemini_cli_result", + "process_user_request", "_handle_generate_send", +}) +COLD_CALLERS = frozenset({"cleanup", "reset_session", "_classify_anthropic_error", "_classify_gemini_error"}) +PER_DISCUSSION_CALLERS = frozenset({"save_project", "load_project", "save_snapshot", "load_snapshot"}) +PER_REQUEST_CALLERS = frozenset({ + "_api_get_key", "_api_status", "_api_performance", "_api_gui", + "_api_mma_status", "_api_comms", "_api_diagnostics", +}) + +def detect_frequency_from_entry_point(caller: str, caller_class: str) -> Frequency: + """Detect the call frequency from the caller name and class.""" + if caller in INIT_CALLERS: + return "init" + if caller in HOT_CALLERS: + return "hot" + if caller in PER_TURN_CALLERS: + return "per_turn" + if caller in COLD_CALLERS: + return "cold" + if caller in PER_DISCUSSION_CALLERS: + return "per_discussion" + if caller in PER_REQUEST_CALLERS: + return "per_request" + return "unknown" + +def load_frequency_overrides(path: str) -> dict[str, Frequency]: + """Load frequency overrides from a TOML file.""" + p = Path(path) + if not p.exists(): + return {} + with p.open("rb") as f: + data = tomllib.load(f) + out: dict[str, Frequency] = {} + for key, value in data.get("frequency", {}).items(): + if isinstance(value, str): + out[key] = value + return out + +def estimate_call_frequency( + function: FunctionRef, + callers: list[tuple[FunctionRef, str]], + overrides: dict[str, Frequency], +) -> Frequency: + """Estimate the call frequency of a function. + + Precedence: override > entry-point detector > unknown. + """ + if function.fqname in overrides: + return overrides[function.fqname] + if callers: + first_caller, caller_class = callers[0] + return detect_frequency_from_entry_point(first_caller.fqname.rsplit(".", 1)[-1], caller_class) + return "unknown" + +MICROSECOND_BUDGET_PER_LLM_TURN: int = 50_000 +BRANCH_DISPATCH_OVERHEAD_US: int = 100 +ALLOCATION_OVERHEAD_US: int = 50 +DEAD_FIELD_COST_PER_FIELD_US: int = 10 +COMPONENTIZATION_INDIRECTION_US: int = 200 +UNIFICATION_INDIRECTION_US: int = 300 + +def per_call_cost_us(struct_field_count: int, hot_path_field_count: int, struct_frozen: bool) -> int: + """Per-call cost in microseconds.""" + return ( + struct_field_count * ALLOCATION_OVERHEAD_US + + max(hot_path_field_count, 1) * BRANCH_DISPATCH_OVERHEAD_US + + (20 if struct_frozen else 0) + ) + +FREQUENCY_MULTIPLIER: dict[Frequency, float] = { + "hot": 60.0, + "per_turn": 1.0, + "per_request": 1.0, + "per_discussion": 1.0, + "cold": 0.01, + "init": 0.001, + "unknown": 0.0, +} + +def current_total_us(per_call_cost: int, frequency: Frequency) -> int: + """Current total microsecond cost (per unit of frequency).""" + return int(per_call_cost * FREQUENCY_MULTIPLIER[frequency]) + +def componentize_factor( + access_pattern: AccessPattern, + struct_field_count: int, + struct_frozen: bool, + hot_field_count: int = 0, +) -> float: + """Determine the componentize factor per spec section 7.5.""" + if access_pattern == "field_by_field" and struct_field_count > 10 and not struct_frozen: + return 0.30 + if access_pattern == "hot_cold_split" and hot_field_count <= 2 and struct_field_count > 5: + return 0.40 + if access_pattern in ("whole_struct", "bulk_batched"): + return -0.20 + if access_pattern == "mixed": + return 0.0 + return -0.10 + +def unify_factor(access_pattern: AccessPattern, struct_field_count: int, struct_frozen: bool) -> float: + """Determine the unify factor per spec section 7.5.""" + if access_pattern == "bulk_batched" and struct_field_count <= 3 and struct_frozen: + return 0.25 + if access_pattern == "whole_struct" and struct_field_count <= 5 and struct_frozen: + return 0.15 + if access_pattern == "field_by_field": + return -0.30 + if access_pattern == "hot_cold_split": + return -0.10 + if access_pattern == "mixed": + return 0.0 + return 0.05 + +def recommended_direction( + access_pattern: AccessPattern, + struct_field_count: int, + struct_frozen: bool, + frequency: Frequency, + hot_field_count: int = 0, +) -> RecommendedDirection: + """Determine the recommended decomposition direction per spec section 7.5. + + Frozen whole_struct is the ideal shape -> hold (overrides unify). + """ + if access_pattern == "field_by_field" and struct_field_count > 10: + return "componentize" + if access_pattern == "hot_cold_split" and hot_field_count <= 2: + return "componentize" + if access_pattern == "bulk_batched" and struct_field_count <= 3: + return "unify" + if access_pattern == "whole_struct" and struct_field_count <= 5 and not struct_frozen: + return "unify" + if access_pattern == "mixed" or frequency == "unknown": + return "insufficient_data" + return "hold" + +def generate_rationale( + aggregate: str, + access_pattern: AccessPattern, + frequency: Frequency, + struct_field_count: int, + struct_frozen: bool, + direction: RecommendedDirection, +) -> str: + """Generate the auto-rationale string per spec section 7.5.""" + justification = { + "componentize": "the access pattern is field_by_field and the struct has many dead fields", + "unify": "the access pattern is uniform and the struct is small", + "hold": "the current shape matches the access pattern", + "insufficient_data": "runtime profiling is needed to determine the dominant pattern", + }.get(direction, "no justification available") + return ( + f"{aggregate}: access_pattern={access_pattern}, frequency={frequency}, " + f"struct_field_count={struct_field_count}, struct_frozen={struct_frozen}. " + f"Recommended: {direction} because {justification}." + ) + +def compute_decomposition_cost( + aggregate: str, + access_pattern: AccessPattern, + struct_field_count: int, + struct_frozen: bool, + frequency: Frequency, + hot_field_count: int = 0, +) -> DecompositionCost: + """Compute the per-aggregate DecompositionCost.""" + per_call = per_call_cost_us(struct_field_count, hot_path_field_count=hot_field_count, struct_frozen=struct_frozen) + current_total = current_total_us(per_call, frequency) + direction = recommended_direction(access_pattern, struct_field_count, struct_frozen, frequency, hot_field_count) + c_factor = componentize_factor(access_pattern, struct_field_count, struct_frozen, hot_field_count) + u_factor = unify_factor(access_pattern, struct_field_count, struct_frozen) + c_savings = int(current_total * c_factor) if c_factor > 0 else 0 + u_savings = int(current_total * u_factor) if u_factor > 0 else 0 + rationale = generate_rationale(aggregate, access_pattern, frequency, struct_field_count, struct_frozen, direction) + return DecompositionCost( + current_cost_estimate=current_total, + componentize_savings=c_savings, + unify_savings=u_savings, + recommended_direction=direction, + recommended_rationale=rationale, + batch_size=None, + struct_field_count=struct_field_count, + struct_frozen=struct_frozen, + ) \ No newline at end of file diff --git a/tests/test_code_path_audit.py b/tests/test_code_path_audit.py index 012404b2..68324cd1 100644 --- a/tests/test_code_path_audit.py +++ b/tests/test_code_path_audit.py @@ -42,6 +42,23 @@ from src.code_path_audit import ( is_bulk_batched_access, dominant_pattern, detect_access_pattern, + detect_frequency_from_entry_point, + load_frequency_overrides, + estimate_call_frequency, + MICROSECOND_BUDGET_PER_LLM_TURN, + BRANCH_DISPATCH_OVERHEAD_US, + ALLOCATION_OVERHEAD_US, + DEAD_FIELD_COST_PER_FIELD_US, + COMPONENTIZATION_INDIRECTION_US, + UNIFICATION_INDIRECTION_US, + per_call_cost_us, + FREQUENCY_MULTIPLIER, + current_total_us, + componentize_factor, + unify_factor, + recommended_direction, + generate_rationale, + compute_decomposition_cost, ) from src.result_types import Result, ErrorInfo, ErrorKind @@ -608,4 +625,217 @@ def test_detect_access_pattern_mixed() -> None: """detect_access_pattern returns 'mixed' when no pattern dominates (2+ distinct keys but <3).""" counts: Counter[str] = Counter({"a": 1, "b": 1}) pattern = detect_access_pattern(counts, has_direct_access=False) - assert pattern == "mixed" \ No newline at end of file + assert pattern == "mixed" + +def test_detect_frequency_init() -> None: + """detect_frequency_from_entry_point returns 'init' for functions called from __init__.""" + freq = detect_frequency_from_entry_point(caller="__init__", caller_class="App") + assert freq == "init" + +def test_detect_frequency_hot() -> None: + """detect_frequency_from_entry_point returns 'hot' for functions called from render loops.""" + freq = detect_frequency_from_entry_point(caller="render_main_toolbar", caller_class="App") + assert freq == "hot" + +def test_detect_frequency_per_turn() -> None: + """detect_frequency_from_entry_point returns 'per_turn' for functions called from AI send paths.""" + freq = detect_frequency_from_entry_point(caller="_send_anthropic_result", caller_class="AIClient") + assert freq == "per_turn" + +def test_detect_frequency_cold() -> None: + """detect_frequency_from_entry_point returns 'cold' for functions called from cleanup.""" + freq = detect_frequency_from_entry_point(caller="cleanup", caller_class="AppController") + assert freq == "cold" + +def test_detect_frequency_per_discussion() -> None: + """detect_frequency_from_entry_point returns 'per_discussion' for save/load functions.""" + freq = detect_frequency_from_entry_point(caller="save_project", caller_class="ProjectManager") + assert freq == "per_discussion" + +def test_detect_frequency_unknown() -> None: + """detect_frequency_from_entry_point returns 'unknown' for unrecognized callers.""" + freq = detect_frequency_from_entry_point(caller="random_method", caller_class="X") + assert freq == "unknown" + +def test_load_frequency_overrides_empty() -> None: + """load_frequency_overrides returns {} for a missing file.""" + result = load_frequency_overrides("/nonexistent/overrides.toml") + assert result == {} + +def test_load_frequency_overrides_parses_toml() -> None: + """load_frequency_overrides parses [frequency.] = '' lines.""" + with tempfile.TemporaryDirectory() as tmp: + overrides_path = Path(tmp) / "overrides.toml" + overrides_path.write_text('[frequency]\n"src.cleanup.do_nothing" = "cold"\n') + result = load_frequency_overrides(str(overrides_path)) + assert result.get("src.cleanup.do_nothing") == "cold" + +def test_estimate_call_frequency_override_wins() -> None: + """estimate_call_frequency respects the override file's mapping.""" + f = FunctionRef(fqname="src.cleanup.do_nothing", file="src/cleanup.py", line=1, role="consumer") + freq = estimate_call_frequency( + f, + callers=[], + overrides={"src.cleanup.do_nothing": "cold"}, + ) + assert freq == "cold" + +def test_estimate_call_frequency_entry_point() -> None: + """estimate_call_frequency uses the entry-point detector when no override.""" + f = FunctionRef(fqname="src.x.y", file="src/x.py", line=1, role="consumer") + freq = estimate_call_frequency( + f, + callers=[(FunctionRef(fqname="src.app.App.__init__", file="src/app.py", line=1, role="producer"), "App")], + overrides={}, + ) + assert freq == "init" + +def test_estimate_call_frequency_unknown_no_callers() -> None: + """estimate_call_frequency returns 'unknown' for functions with no callers and no override.""" + f = FunctionRef(fqname="src.lonely.func", file="src/lonely.py", line=1, role="consumer") + freq = estimate_call_frequency(f, callers=[], overrides={}) + assert freq == "unknown" + +def test_cost_constants() -> None: + """The 6 cost-model constants are defined per spec section 7.5.""" + assert MICROSECOND_BUDGET_PER_LLM_TURN == 50_000 + assert BRANCH_DISPATCH_OVERHEAD_US == 100 + assert ALLOCATION_OVERHEAD_US == 50 + assert DEAD_FIELD_COST_PER_FIELD_US == 10 + assert COMPONENTIZATION_INDIRECTION_US == 200 + assert UNIFICATION_INDIRECTION_US == 300 + +def test_per_call_cost_us_no_frozen() -> None: + """per_call_cost_us = struct_field_count * 50 + max(fields_accessed_in_hot_path, 1) * 100 + 0 (not frozen).""" + cost = per_call_cost_us(struct_field_count=10, hot_path_field_count=2, struct_frozen=False) + assert cost == 10 * 50 + 2 * 100 + +def test_per_call_cost_us_frozen() -> None: + """per_call_cost_us adds 20 for frozen dataclasses.""" + cost = per_call_cost_us(struct_field_count=10, hot_path_field_count=2, struct_frozen=True) + assert cost == 10 * 50 + 2 * 100 + 20 + +def test_per_call_cost_us_min_hot_path() -> None: + """per_call_cost_us uses max(hot_path_field_count, 1) to avoid zero branch overhead.""" + cost = per_call_cost_us(struct_field_count=10, hot_path_field_count=0, struct_frozen=False) + assert cost == 10 * 50 + 1 * 100 + +def test_frequency_multiplier_7_values() -> None: + """FREQUENCY_MULTIPLIER has 7 entries.""" + assert FREQUENCY_MULTIPLIER["hot"] == 60 + assert FREQUENCY_MULTIPLIER["per_turn"] == 1 + assert FREQUENCY_MULTIPLIER["per_request"] == 1 + assert FREQUENCY_MULTIPLIER["per_discussion"] == 1 + assert FREQUENCY_MULTIPLIER["cold"] == 0.01 + assert FREQUENCY_MULTIPLIER["init"] == 0.001 + assert FREQUENCY_MULTIPLIER["unknown"] == 0 + +def test_current_total_us_per_turn() -> None: + """current_total_us = per_call_cost * frequency_multiplier for per_turn.""" + total = current_total_us(per_call_cost=500, frequency="per_turn") + assert total == 500 + +def test_current_total_us_hot() -> None: + """current_total_us = per_call_cost * 60 for hot.""" + total = current_total_us(per_call_cost=500, frequency="hot") + assert total == 30_000 + +def test_componentize_factor_field_by_field_large() -> None: + """componentize_factor=0.30 for field_by_field + struct_field_count > 10 + not frozen.""" + f = componentize_factor(access_pattern="field_by_field", struct_field_count=15, struct_frozen=False) + assert f == 0.30 + +def test_componentize_factor_hot_cold_split_small_hot() -> None: + """componentize_factor=0.40 for hot_cold_split + hot_field_count<=2 + struct_field_count>5.""" + f = componentize_factor(access_pattern="hot_cold_split", struct_field_count=8, struct_frozen=False, hot_field_count=2) + assert f == 0.40 + +def test_componentize_factor_whole_struct_negative() -> None: + """componentize_factor=-0.20 for whole_struct (splitting hurts).""" + f = componentize_factor(access_pattern="whole_struct", struct_field_count=5, struct_frozen=False) + assert f == -0.20 + +def test_componentize_factor_mixed_zero() -> None: + """componentize_factor=0.0 for mixed (insufficient evidence).""" + f = componentize_factor(access_pattern="mixed", struct_field_count=5, struct_frozen=False) + assert f == 0.0 + +def test_unify_factor_bulk_batched_small_frozen() -> None: + """unify_factor=0.25 for bulk_batched + struct_field_count <= 3 + frozen.""" + f = unify_factor(access_pattern="bulk_batched", struct_field_count=3, struct_frozen=True) + assert f == 0.25 + +def test_unify_factor_whole_struct_small_frozen() -> None: + """unify_factor=0.15 for whole_struct + struct_field_count <= 5 + frozen.""" + f = unify_factor(access_pattern="whole_struct", struct_field_count=5, struct_frozen=True) + assert f == 0.15 + +def test_unify_factor_field_by_field_negative() -> None: + """unify_factor=-0.30 for field_by_field (unification widens the data).""" + f = unify_factor(access_pattern="field_by_field", struct_field_count=15, struct_frozen=True) + assert f == -0.30 + +def test_unify_factor_mixed_zero() -> None: + """unify_factor=0.0 for mixed (insufficient evidence).""" + f = unify_factor(access_pattern="mixed", struct_field_count=5, struct_frozen=True) + assert f == 0.0 + +def test_recommended_direction_componentize_field_by_field() -> None: + """recommended_direction='componentize' for field_by_field + struct_field_count>10.""" + d = recommended_direction(access_pattern="field_by_field", struct_field_count=15, struct_frozen=False, frequency="per_turn", hot_field_count=0) + assert d == "componentize" + +def test_recommended_direction_unify_bulk_batched() -> None: + """recommended_direction='unify' for bulk_batched + struct_field_count<=3.""" + d = recommended_direction(access_pattern="bulk_batched", struct_field_count=3, struct_frozen=True, frequency="per_turn", hot_field_count=0) + assert d == "unify" + +def test_recommended_direction_insufficient_data_mixed() -> None: + """recommended_direction='insufficient_data' for mixed (needs runtime profiling).""" + d = recommended_direction(access_pattern="mixed", struct_field_count=5, struct_frozen=True, frequency="per_turn", hot_field_count=0) + assert d == "insufficient_data" + +def test_recommended_direction_hold_frozen_whole_struct() -> None: + """recommended_direction='hold' for frozen + whole_struct (ideal shape).""" + d = recommended_direction(access_pattern="whole_struct", struct_field_count=5, struct_frozen=True, frequency="per_turn", hot_field_count=0) + assert d == "hold" + +def test_generate_rationale_includes_pattern() -> None: + """generate_rationale includes the access pattern.""" + s = generate_rationale( + aggregate="Metadata", + access_pattern="field_by_field", + frequency="per_turn", + struct_field_count=15, + struct_frozen=False, + direction="componentize", + ) + assert "field_by_field" in s + assert "per_turn" in s + assert "componentize" in s + assert "Metadata" in s + +def test_compute_decomposition_cost_hold() -> None: + """compute_decomposition_cost returns 'hold' for the canonical frozen + whole_struct case.""" + cost = compute_decomposition_cost( + aggregate="Metadata", + access_pattern="whole_struct", + struct_field_count=8, + struct_frozen=True, + frequency="per_turn", + ) + assert cost.recommended_direction == "hold" + assert cost.struct_field_count == 8 + assert cost.struct_frozen is True + +def test_compute_decomposition_cost_componentize() -> None: + """compute_decomposition_cost returns 'componentize' for field_by_field + large struct.""" + cost = compute_decomposition_cost( + aggregate="BigStruct", + access_pattern="field_by_field", + struct_field_count=15, + struct_frozen=False, + frequency="per_turn", + ) + assert cost.recommended_direction == "componentize" + assert cost.componentize_savings > 0 \ No newline at end of file