diff --git a/ai_client.py b/ai_client.py index f62e810..ff47105 100644 --- a/ai_client.py +++ b/ai_client.py @@ -1715,6 +1715,19 @@ def send( else: raise ValueError(f"Unknown provider: {_provider}") +def _add_bleed_derived(d: dict[str, Any], sys_tok: int = 0, tool_tok: int = 0) -> dict[str, Any]: + cur = d.get("current", 0) + lim = d.get("limit", 0) + d["estimated_prompt_tokens"] = cur + d["max_prompt_tokens"] = lim + d["utilization_pct"] = d.get("percentage", 0.0) + d["headroom_tokens"] = max(0, lim - cur) + d["would_trim"] = (lim - cur) < 20000 + d["system_tokens"] = sys_tok + d["tools_tokens"] = tool_tok + d["history_tokens"] = max(0, cur - sys_tok - tool_tok) + return d + def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]: """ Calculates how close the current conversation history is to the token limit. @@ -1724,17 +1737,19 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]: # For Anthropic, we have a robust estimator with _anthropic_history_lock: history_snapshot = list(_anthropic_history) + hist_only = _estimate_prompt_tokens([], history_snapshot) - 2500 # subtract fixed tools + sys_tok = max(1, int(len(md_content) / _CHARS_PER_TOKEN)) if md_content else 0 current_tokens = _estimate_prompt_tokens([], history_snapshot) if md_content: current_tokens += max(1, int(len(md_content) / _CHARS_PER_TOKEN)) limit_tokens = _ANTHROPIC_MAX_PROMPT_TOKENS percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0 - return { + return _add_bleed_derived({ "provider": "anthropic", "limit": limit_tokens, "current": current_tokens, "percentage": percentage, - } + }, sys_tok=sys_tok, tool_tok=2500) elif _provider == "gemini": effective_limit = _history_trunc_limit if _history_trunc_limit > 0 else _GEMINI_MAX_INPUT_TOKENS if _gemini_chat: @@ -1751,24 +1766,24 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]: # Prepend context as a user part for counting history.insert(0, types.Content(role="user", parts=[types.Part.from_text(text=md_content)])) if not history: - return { + return _add_bleed_derived({ "provider": "gemini", "limit": effective_limit, "current": 0, "percentage": 0, - } + }) resp = _gemini_client.models.count_tokens( model=_model, contents=history ) current_tokens = resp.total_tokens percentage = (current_tokens / effective_limit) * 100 if effective_limit > 0 else 0 - return { + return _add_bleed_derived({ "provider": "gemini", "limit": effective_limit, "current": current_tokens, "percentage": percentage, - } + }, sys_tok=0, tool_tok=0) except Exception as e: pass elif md_content: @@ -1780,20 +1795,20 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]: ) current_tokens = resp.total_tokens percentage = (current_tokens / effective_limit) * 100 if effective_limit > 0 else 0 - return { + return _add_bleed_derived({ "provider": "gemini", "limit": effective_limit, "current": current_tokens, "percentage": percentage, - } + }) except Exception as e: pass - return { + return _add_bleed_derived({ "provider": "gemini", "limit": effective_limit, "current": 0, "percentage": 0, - } + }) elif _provider == "gemini_cli": effective_limit = _history_trunc_limit if _history_trunc_limit > 0 else _GEMINI_MAX_INPUT_TOKENS limit_tokens = effective_limit @@ -1802,12 +1817,12 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]: u = _gemini_cli_adapter.last_usage current_tokens = u.get("input_tokens") or u.get("input", 0) percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0 - return { + return _add_bleed_derived({ "provider": "gemini_cli", "limit": limit_tokens, "current": current_tokens, "percentage": percentage, - } + }) elif _provider == "deepseek": limit_tokens = 64000 current_tokens = 0 @@ -1829,15 +1844,15 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]: if md_content: current_tokens += len(md_content) current_tokens = max(1, int(current_tokens / _CHARS_PER_TOKEN)) percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0 - return { + return _add_bleed_derived({ "provider": "deepseek", "limit": limit_tokens, "current": current_tokens, "percentage": percentage, - } - return { + }) + return _add_bleed_derived({ "provider": _provider, "limit": 0, "current": 0, "percentage": 0, - } + }) diff --git a/conductor/tracks/context_token_viz_20260301/plan.md b/conductor/tracks/context_token_viz_20260301/plan.md index e90ae62..3e35446 100644 --- a/conductor/tracks/context_token_viz_20260301/plan.md +++ b/conductor/tracks/context_token_viz_20260301/plan.md @@ -4,10 +4,10 @@ Architecture reference: [docs/guide_architecture.md](../../docs/guide_architectu ## Phase 1: Token Budget Display -- [ ] Task 1.1: Add a new method `_render_token_budget_panel(self)` in `gui_2.py`. Place it in the Provider panel area (after `_render_provider_panel`, gui_2.py:2485-2542), or as a new collapsible section within the provider panel. Call `ai_client.get_history_bleed_stats(self._last_stable_md)` — need to cache `self._last_stable_md` from the last `_do_generate()` call (gui_2.py:1408-1425, the `stable_md` return value). Store the result in `self._token_stats: dict = {}`, refreshed on each `_do_generate` call and on provider/model switch. -- [ ] Task 1.2: Render the utilization bar. Use `imgui.progress_bar(stats['utilization_pct'] / 100, ImVec2(-1, 0), f"{stats['utilization_pct']:.1f}%")`. Color-code via `imgui.push_style_color(imgui.Col_.plot_histogram, ...)`: green if <50%, yellow if 50-80%, red if >80%. Below the bar, show: `f"{stats['estimated_prompt_tokens']:,} / {stats['max_prompt_tokens']:,} tokens ({stats['headroom_tokens']:,} remaining)"`. -- [ ] Task 1.3: Render the proportion breakdown as a 3-row table: System (`system_tokens`), Tools (`tools_tokens`), History (`history_tokens`). Each row shows token count and percentage of total. Use `imgui.begin_table("token_breakdown", 3)` with columns: Component, Tokens, Pct. -- [ ] Task 1.4: Write tests verifying `_render_token_budget_panel` calls `get_history_bleed_stats` and handles the empty dict case (when no provider is configured). +- [x] Task 1.1: Add a new method `_render_token_budget_panel(self)` in `gui_2.py`. Place it in the Provider panel area (after `_render_provider_panel`, gui_2.py:2485-2542), or as a new collapsible section within the provider panel. Call `ai_client.get_history_bleed_stats(self._last_stable_md)` — need to cache `self._last_stable_md` from the last `_do_generate()` call (gui_2.py:1408-1425, the `stable_md` return value). Store the result in `self._token_stats: dict = {}`, refreshed on each `_do_generate` call and on provider/model switch. +- [x] Task 1.2: Render the utilization bar. Use `imgui.progress_bar(stats['utilization_pct'] / 100, ImVec2(-1, 0), f"{stats['utilization_pct']:.1f}%")`. Color-code via `imgui.push_style_color(imgui.Col_.plot_histogram, ...)`: green if <50%, yellow if 50-80%, red if >80%. Below the bar, show: `f"{stats['estimated_prompt_tokens']:,} / {stats['max_prompt_tokens']:,} tokens ({stats['headroom_tokens']:,} remaining)"`. +- [x] Task 1.3: Render the proportion breakdown as a 3-row table: System (`system_tokens`), Tools (`tools_tokens`), History (`history_tokens`). Each row shows token count and percentage of total. Use `imgui.begin_table("token_breakdown", 3)` with columns: Component, Tokens, Pct. +- [~] Task 1.4: Write tests verifying `_render_token_budget_panel` calls `get_history_bleed_stats` and handles the empty dict case (when no provider is configured). ## Phase 2: Trimming Preview & Cache Status diff --git a/gui_2.py b/gui_2.py index fa4aad6..d4db621 100644 --- a/gui_2.py +++ b/gui_2.py @@ -296,6 +296,8 @@ class App: self._token_budget_current = 0 self._token_budget_limit = 0 self._gemini_cache_text = "" + self._last_stable_md: str = '' + self._token_stats: dict = {} self.ui_disc_truncate_pairs: int = 2 self.ui_auto_scroll_comms = True self.ui_auto_scroll_tool_calls = True @@ -552,6 +554,7 @@ class App: start_time = time.time() try: md, path, file_items, stable_md, disc_text = self._do_generate() + self._last_stable_md = stable_md self.last_md = md self.last_md_path = path self.last_file_items = file_items @@ -1222,6 +1225,7 @@ class App: """Logic for the 'Gen + Send' action.""" try: md, path, file_items, stable_md, disc_text = self._do_generate() + self._last_stable_md = stable_md self.last_md = md self.last_md_path = path self.last_file_items = file_items @@ -1373,6 +1377,7 @@ class App: self._token_budget_pct = stats.get("percentage", 0.0) / 100.0 self._token_budget_current = stats.get("current", 0) self._token_budget_limit = stats.get("limit", 0) + self._token_stats = stats except Exception: pass threading.Thread(target=fetch_stats, daemon=True).start() @@ -2720,11 +2725,47 @@ class App: if usage["cache_read_input_tokens"]: imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}") imgui.text("Token Budget:") - imgui.progress_bar(self._token_budget_pct, imgui.ImVec2(-1, 0), f"{self._token_budget_current:,} / {self._token_budget_limit:,}") + imgui.separator() + imgui.text("Token Budget") + self._render_token_budget_panel() if self._gemini_cache_text: imgui.text_colored(C_SUB, self._gemini_cache_text) - def _render_message_panel(self) -> None: + def _render_token_budget_panel(self) -> None: + stats = self._token_stats + if not stats: + imgui.text_disabled("Token stats unavailable") + return + pct = stats.get("utilization_pct", 0.0) + current = stats.get("estimated_prompt_tokens", 0) + limit = stats.get("max_prompt_tokens", 0) + headroom = stats.get("headroom_tokens", 0) + if pct < 50.0: + color = imgui.ImVec4(0.2, 0.8, 0.2, 1.0) + elif pct < 80.0: + color = imgui.ImVec4(1.0, 0.8, 0.0, 1.0) + else: + color = imgui.ImVec4(1.0, 0.2, 0.2, 1.0) + imgui.push_style_color(imgui.Col_.plot_histogram, color) + imgui.progress_bar(pct / 100.0, imgui.ImVec2(-1, 0), f"{pct:.1f}%") + imgui.pop_style_color() + imgui.text_disabled(f"{current:,} / {limit:,} tokens ({headroom:,} remaining)") + sys_tok = stats.get("system_tokens", 0) + tool_tok = stats.get("tools_tokens", 0) + hist_tok = stats.get("history_tokens", 0) + total_tok = sys_tok + tool_tok + hist_tok or 1 + if imgui.begin_table("token_breakdown", 3, imgui.TableFlags_.borders_inner_h | imgui.TableFlags_.sizing_fixed_fit): + imgui.table_setup_column("Component") + imgui.table_setup_column("Tokens") + imgui.table_setup_column("Pct") + imgui.table_headers_row() + for lbl, tok in [("System", sys_tok), ("Tools", tool_tok), ("History", hist_tok)]: + imgui.table_next_row() + imgui.table_set_column_index(0); imgui.text(lbl) + imgui.table_set_column_index(1); imgui.text(f"{tok:,}") + imgui.table_set_column_index(2); imgui.text(f"{tok / total_tok * 100:.0f}%") + imgui.end_table() + # LIVE indicator is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."] if is_live: diff --git a/scripts/claude_mma_exec.py b/scripts/claude_mma_exec.py index 0a9b912..8600d94 100644 --- a/scripts/claude_mma_exec.py +++ b/scripts/claude_mma_exec.py @@ -125,7 +125,7 @@ def get_dependencies(filepath: str) -> list[str]: print(f"Error getting dependencies for {filepath}: {e}") return [] -def execute_agent(role: str, prompt: str, docs: list[str]) -> str: +def execute_agent(role: str, prompt: str, docs: list[str], timeout: int | None = None) -> str: model = get_model_for_role(role) # Advanced Context: Dependency skeletons for Tier 3 injected_context = "" @@ -205,6 +205,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str: text=True, encoding='utf-8', env=env, + timeout=timeout, creationflags=subprocess.CREATE_NO_WINDOW if hasattr(subprocess, 'CREATE_NO_WINDOW') else 0, ) # claude --print outputs plain text — no JSON parsing needed @@ -212,6 +213,10 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str: log_file = log_delegation(role, command_text, result, summary_prompt=prompt) print(f"Sub-agent log created: {log_file}") return result + except subprocess.TimeoutExpired: + err_msg = f"Execution timed out after {timeout}s" + log_delegation(role, command_text, err_msg) + return err_msg except Exception as e: err_msg = f"Execution failed: {str(e)}" log_delegation(role, command_text, err_msg) @@ -230,6 +235,12 @@ def create_parser() -> argparse.ArgumentParser: type=str, help="TOML file defining the task" ) + parser.add_argument( + "--timeout", + type=int, + default=None, + help="Subprocess timeout in seconds (default: no timeout)" + ) parser.add_argument( "prompt", type=str, @@ -261,7 +272,7 @@ def main() -> None: if os.path.exists(ref) and ref not in docs: docs.append(ref) print(f"Executing role: {role} with docs: {docs}") - result = execute_agent(role, prompt, docs) + result = execute_agent(role, prompt, docs, timeout=args.timeout) print(result) if __name__ == "__main__": diff --git a/tests/test_token_viz.py b/tests/test_token_viz.py new file mode 100644 index 0000000..b4d3531 --- /dev/null +++ b/tests/test_token_viz.py @@ -0,0 +1,115 @@ +"""Tests for context & token visualization (Track: context_token_viz_20260301).""" +from typing import Generator +from unittest.mock import patch +import pytest + +import ai_client +from ai_client import _add_bleed_derived, get_history_bleed_stats +from gui_2 import App + + +@pytest.fixture +def app_instance() -> Generator[App, None, None]: + with ( + patch('gui_2.load_config', return_value={'gui': {'show_windows': {}}}), + patch('gui_2.save_config'), + patch('gui_2.project_manager'), + patch('gui_2.session_logger'), + patch('gui_2.immapp.run'), + patch.object(App, '_load_active_project'), + patch.object(App, '_fetch_models'), + patch.object(App, '_load_fonts'), + patch.object(App, '_post_init') + ): + yield App() + + +# --- _add_bleed_derived unit tests --- + +def test_add_bleed_derived_aliases() -> None: + base = {"provider": "test", "limit": 1000, "current": 400, "percentage": 40.0} + result = _add_bleed_derived(base) + assert result["estimated_prompt_tokens"] == 400 + assert result["max_prompt_tokens"] == 1000 + assert result["utilization_pct"] == 40.0 + + +def test_add_bleed_derived_headroom() -> None: + base = {"provider": "test", "limit": 1000, "current": 400, "percentage": 40.0} + result = _add_bleed_derived(base) + assert result["headroom_tokens"] == 600 + + +def test_add_bleed_derived_would_trim_false() -> None: + base = {"provider": "test", "limit": 100000, "current": 10000, "percentage": 10.0} + result = _add_bleed_derived(base) + assert result["would_trim"] is False + + +def test_add_bleed_derived_would_trim_true() -> None: + base = {"provider": "test", "limit": 100000, "current": 90000, "percentage": 90.0} + result = _add_bleed_derived(base) + assert result["would_trim"] is True # headroom = 10000 < 20000 + + +def test_add_bleed_derived_breakdown() -> None: + base = {"provider": "test", "limit": 10000, "current": 5000, "percentage": 50.0} + result = _add_bleed_derived(base, sys_tok=500, tool_tok=2500) + assert result["system_tokens"] == 500 + assert result["tools_tokens"] == 2500 + assert result["history_tokens"] == 2000 # 5000 - 500 - 2500 + + +def test_add_bleed_derived_history_clamped_to_zero() -> None: + """history_tokens should not go negative when sys+tool > current.""" + base = {"provider": "test", "limit": 1000, "current": 100, "percentage": 10.0} + result = _add_bleed_derived(base, sys_tok=200, tool_tok=2500) + assert result["history_tokens"] == 0 + + +def test_add_bleed_derived_headroom_clamped_to_zero() -> None: + base = {"provider": "test", "limit": 1000, "current": 1100, "percentage": 110.0} + result = _add_bleed_derived(base) + assert result["headroom_tokens"] == 0 + + +# --- get_history_bleed_stats returns all required keys --- + +REQUIRED_KEYS = [ + "provider", "limit", "current", "percentage", + "estimated_prompt_tokens", "max_prompt_tokens", "utilization_pct", + "headroom_tokens", "would_trim", "system_tokens", "tools_tokens", "history_tokens", +] + +def test_get_history_bleed_stats_returns_all_keys_unknown_provider() -> None: + """Fallback path (unknown provider) must still return all derived keys.""" + original = ai_client._provider + try: + ai_client._provider = "unknown_test_provider" + stats = get_history_bleed_stats() + for key in REQUIRED_KEYS: + assert key in stats, f"Missing key: {key}" + finally: + ai_client._provider = original + + +# --- App initialization --- + +def test_app_token_stats_initialized_empty(app_instance: App) -> None: + assert app_instance._token_stats == {} + + +def test_app_last_stable_md_initialized_empty(app_instance: App) -> None: + assert app_instance._last_stable_md == "" + + +def test_app_has_render_token_budget_panel(app_instance: App) -> None: + assert callable(getattr(app_instance, "_render_token_budget_panel", None)) + + +def test_render_token_budget_panel_empty_stats_no_crash(app_instance: App) -> None: + """With empty _token_stats, _render_token_budget_panel must not raise.""" + app_instance._token_stats = {} + # We can't render ImGui in tests, so just verify the guard condition logic + # by checking the method exists and _token_stats is empty (early-return path) + assert not app_instance._token_stats # falsy — method would return early