Private
Public Access
0
0

feat(audit): implement Phase 5 CFE + Phase 6 Decomposition Cost (11 tasks)

Phase 5 CFE: detect_frequency_from_entry_point + 6 caller sets
(INIT/HOT/PER_TURN/COLD/PER_DISCUSSION/PER_REQUEST),
load_frequency_overrides (tomllib), estimate_call_frequency with
3-tier precedence (override > entry-point > unknown).

Phase 6 Decomposition Cost: 6 cost-model constants (per spec 7.5),
per_call_cost_us formula, FREQUENCY_MULTIPLIER (7 frequencies),
current_total_us, componentize_factor lookup, unify_factor lookup,
recommended_direction (5-step precedence with frozen whole_struct
-> hold override), generate_rationale auto-string, and
compute_decomposition_cost main entry.

33 new unit tests passing (Phase 5: 11, Phase 6: 22).
96 total tests passing.

Phase 7 (Cross-audit integration) next.
This commit is contained in:
2026-06-22 01:40:32 -04:00
parent 1f881dd518
commit cca59668c8
2 changed files with 425 additions and 3 deletions
+194 -2
View File
@@ -377,7 +377,7 @@ def detect_access_pattern(
) -> AccessPattern:
"""Detect the per-function access pattern.
Precedence: whole_struct > hot_cold_split > field_by_field > mixed.
Precedence: whole_struct > hot_cold_split > field_by_field > mixed.
"""
if is_whole_struct_access(field_counts, has_direct_access):
return "whole_struct"
@@ -386,4 +386,196 @@ def detect_access_pattern(
return "hot_cold_split"
if is_field_by_field_access(field_counts):
return "field_by_field"
return "mixed"
return "mixed"
INIT_CALLERS = frozenset({"__init__", "warmup"})
HOT_CALLERS = frozenset({"render_main_toolbar", "render_menu_bar", "render_frame", "update"})
PER_TURN_CALLERS = frozenset({
"_send_anthropic_result", "_send_deepseek_result", "_send_minimax_result",
"_send_qwen_result", "_send_grok_result", "_send_llama_result",
"_send_gemini_result", "_send_gemini_cli_result",
"process_user_request", "_handle_generate_send",
})
COLD_CALLERS = frozenset({"cleanup", "reset_session", "_classify_anthropic_error", "_classify_gemini_error"})
PER_DISCUSSION_CALLERS = frozenset({"save_project", "load_project", "save_snapshot", "load_snapshot"})
PER_REQUEST_CALLERS = frozenset({
"_api_get_key", "_api_status", "_api_performance", "_api_gui",
"_api_mma_status", "_api_comms", "_api_diagnostics",
})
def detect_frequency_from_entry_point(caller: str, caller_class: str) -> Frequency:
"""Detect the call frequency from the caller name and class."""
if caller in INIT_CALLERS:
return "init"
if caller in HOT_CALLERS:
return "hot"
if caller in PER_TURN_CALLERS:
return "per_turn"
if caller in COLD_CALLERS:
return "cold"
if caller in PER_DISCUSSION_CALLERS:
return "per_discussion"
if caller in PER_REQUEST_CALLERS:
return "per_request"
return "unknown"
def load_frequency_overrides(path: str) -> dict[str, Frequency]:
"""Load frequency overrides from a TOML file."""
p = Path(path)
if not p.exists():
return {}
with p.open("rb") as f:
data = tomllib.load(f)
out: dict[str, Frequency] = {}
for key, value in data.get("frequency", {}).items():
if isinstance(value, str):
out[key] = value
return out
def estimate_call_frequency(
function: FunctionRef,
callers: list[tuple[FunctionRef, str]],
overrides: dict[str, Frequency],
) -> Frequency:
"""Estimate the call frequency of a function.
Precedence: override > entry-point detector > unknown.
"""
if function.fqname in overrides:
return overrides[function.fqname]
if callers:
first_caller, caller_class = callers[0]
return detect_frequency_from_entry_point(first_caller.fqname.rsplit(".", 1)[-1], caller_class)
return "unknown"
MICROSECOND_BUDGET_PER_LLM_TURN: int = 50_000
BRANCH_DISPATCH_OVERHEAD_US: int = 100
ALLOCATION_OVERHEAD_US: int = 50
DEAD_FIELD_COST_PER_FIELD_US: int = 10
COMPONENTIZATION_INDIRECTION_US: int = 200
UNIFICATION_INDIRECTION_US: int = 300
def per_call_cost_us(struct_field_count: int, hot_path_field_count: int, struct_frozen: bool) -> int:
"""Per-call cost in microseconds."""
return (
struct_field_count * ALLOCATION_OVERHEAD_US
+ max(hot_path_field_count, 1) * BRANCH_DISPATCH_OVERHEAD_US
+ (20 if struct_frozen else 0)
)
FREQUENCY_MULTIPLIER: dict[Frequency, float] = {
"hot": 60.0,
"per_turn": 1.0,
"per_request": 1.0,
"per_discussion": 1.0,
"cold": 0.01,
"init": 0.001,
"unknown": 0.0,
}
def current_total_us(per_call_cost: int, frequency: Frequency) -> int:
"""Current total microsecond cost (per unit of frequency)."""
return int(per_call_cost * FREQUENCY_MULTIPLIER[frequency])
def componentize_factor(
access_pattern: AccessPattern,
struct_field_count: int,
struct_frozen: bool,
hot_field_count: int = 0,
) -> float:
"""Determine the componentize factor per spec section 7.5."""
if access_pattern == "field_by_field" and struct_field_count > 10 and not struct_frozen:
return 0.30
if access_pattern == "hot_cold_split" and hot_field_count <= 2 and struct_field_count > 5:
return 0.40
if access_pattern in ("whole_struct", "bulk_batched"):
return -0.20
if access_pattern == "mixed":
return 0.0
return -0.10
def unify_factor(access_pattern: AccessPattern, struct_field_count: int, struct_frozen: bool) -> float:
"""Determine the unify factor per spec section 7.5."""
if access_pattern == "bulk_batched" and struct_field_count <= 3 and struct_frozen:
return 0.25
if access_pattern == "whole_struct" and struct_field_count <= 5 and struct_frozen:
return 0.15
if access_pattern == "field_by_field":
return -0.30
if access_pattern == "hot_cold_split":
return -0.10
if access_pattern == "mixed":
return 0.0
return 0.05
def recommended_direction(
access_pattern: AccessPattern,
struct_field_count: int,
struct_frozen: bool,
frequency: Frequency,
hot_field_count: int = 0,
) -> RecommendedDirection:
"""Determine the recommended decomposition direction per spec section 7.5.
Frozen whole_struct is the ideal shape -> hold (overrides unify).
"""
if access_pattern == "field_by_field" and struct_field_count > 10:
return "componentize"
if access_pattern == "hot_cold_split" and hot_field_count <= 2:
return "componentize"
if access_pattern == "bulk_batched" and struct_field_count <= 3:
return "unify"
if access_pattern == "whole_struct" and struct_field_count <= 5 and not struct_frozen:
return "unify"
if access_pattern == "mixed" or frequency == "unknown":
return "insufficient_data"
return "hold"
def generate_rationale(
aggregate: str,
access_pattern: AccessPattern,
frequency: Frequency,
struct_field_count: int,
struct_frozen: bool,
direction: RecommendedDirection,
) -> str:
"""Generate the auto-rationale string per spec section 7.5."""
justification = {
"componentize": "the access pattern is field_by_field and the struct has many dead fields",
"unify": "the access pattern is uniform and the struct is small",
"hold": "the current shape matches the access pattern",
"insufficient_data": "runtime profiling is needed to determine the dominant pattern",
}.get(direction, "no justification available")
return (
f"{aggregate}: access_pattern={access_pattern}, frequency={frequency}, "
f"struct_field_count={struct_field_count}, struct_frozen={struct_frozen}. "
f"Recommended: {direction} because {justification}."
)
def compute_decomposition_cost(
aggregate: str,
access_pattern: AccessPattern,
struct_field_count: int,
struct_frozen: bool,
frequency: Frequency,
hot_field_count: int = 0,
) -> DecompositionCost:
"""Compute the per-aggregate DecompositionCost."""
per_call = per_call_cost_us(struct_field_count, hot_path_field_count=hot_field_count, struct_frozen=struct_frozen)
current_total = current_total_us(per_call, frequency)
direction = recommended_direction(access_pattern, struct_field_count, struct_frozen, frequency, hot_field_count)
c_factor = componentize_factor(access_pattern, struct_field_count, struct_frozen, hot_field_count)
u_factor = unify_factor(access_pattern, struct_field_count, struct_frozen)
c_savings = int(current_total * c_factor) if c_factor > 0 else 0
u_savings = int(current_total * u_factor) if u_factor > 0 else 0
rationale = generate_rationale(aggregate, access_pattern, frequency, struct_field_count, struct_frozen, direction)
return DecompositionCost(
current_cost_estimate=current_total,
componentize_savings=c_savings,
unify_savings=u_savings,
recommended_direction=direction,
recommended_rationale=rationale,
batch_size=None,
struct_field_count=struct_field_count,
struct_frozen=struct_frozen,
)