feat(token-viz): Phase 1 — token budget panel with color bar and breakdown table
This commit is contained in:
47
ai_client.py
47
ai_client.py
@@ -1715,6 +1715,19 @@ def send(
|
|||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown provider: {_provider}")
|
raise ValueError(f"Unknown provider: {_provider}")
|
||||||
|
|
||||||
|
def _add_bleed_derived(d: dict[str, Any], sys_tok: int = 0, tool_tok: int = 0) -> dict[str, Any]:
|
||||||
|
cur = d.get("current", 0)
|
||||||
|
lim = d.get("limit", 0)
|
||||||
|
d["estimated_prompt_tokens"] = cur
|
||||||
|
d["max_prompt_tokens"] = lim
|
||||||
|
d["utilization_pct"] = d.get("percentage", 0.0)
|
||||||
|
d["headroom_tokens"] = max(0, lim - cur)
|
||||||
|
d["would_trim"] = (lim - cur) < 20000
|
||||||
|
d["system_tokens"] = sys_tok
|
||||||
|
d["tools_tokens"] = tool_tok
|
||||||
|
d["history_tokens"] = max(0, cur - sys_tok - tool_tok)
|
||||||
|
return d
|
||||||
|
|
||||||
def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]:
|
def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Calculates how close the current conversation history is to the token limit.
|
Calculates how close the current conversation history is to the token limit.
|
||||||
@@ -1724,17 +1737,19 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]:
|
|||||||
# For Anthropic, we have a robust estimator
|
# For Anthropic, we have a robust estimator
|
||||||
with _anthropic_history_lock:
|
with _anthropic_history_lock:
|
||||||
history_snapshot = list(_anthropic_history)
|
history_snapshot = list(_anthropic_history)
|
||||||
|
hist_only = _estimate_prompt_tokens([], history_snapshot) - 2500 # subtract fixed tools
|
||||||
|
sys_tok = max(1, int(len(md_content) / _CHARS_PER_TOKEN)) if md_content else 0
|
||||||
current_tokens = _estimate_prompt_tokens([], history_snapshot)
|
current_tokens = _estimate_prompt_tokens([], history_snapshot)
|
||||||
if md_content:
|
if md_content:
|
||||||
current_tokens += max(1, int(len(md_content) / _CHARS_PER_TOKEN))
|
current_tokens += max(1, int(len(md_content) / _CHARS_PER_TOKEN))
|
||||||
limit_tokens = _ANTHROPIC_MAX_PROMPT_TOKENS
|
limit_tokens = _ANTHROPIC_MAX_PROMPT_TOKENS
|
||||||
percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0
|
percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0
|
||||||
return {
|
return _add_bleed_derived({
|
||||||
"provider": "anthropic",
|
"provider": "anthropic",
|
||||||
"limit": limit_tokens,
|
"limit": limit_tokens,
|
||||||
"current": current_tokens,
|
"current": current_tokens,
|
||||||
"percentage": percentage,
|
"percentage": percentage,
|
||||||
}
|
}, sys_tok=sys_tok, tool_tok=2500)
|
||||||
elif _provider == "gemini":
|
elif _provider == "gemini":
|
||||||
effective_limit = _history_trunc_limit if _history_trunc_limit > 0 else _GEMINI_MAX_INPUT_TOKENS
|
effective_limit = _history_trunc_limit if _history_trunc_limit > 0 else _GEMINI_MAX_INPUT_TOKENS
|
||||||
if _gemini_chat:
|
if _gemini_chat:
|
||||||
@@ -1751,24 +1766,24 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]:
|
|||||||
# Prepend context as a user part for counting
|
# Prepend context as a user part for counting
|
||||||
history.insert(0, types.Content(role="user", parts=[types.Part.from_text(text=md_content)]))
|
history.insert(0, types.Content(role="user", parts=[types.Part.from_text(text=md_content)]))
|
||||||
if not history:
|
if not history:
|
||||||
return {
|
return _add_bleed_derived({
|
||||||
"provider": "gemini",
|
"provider": "gemini",
|
||||||
"limit": effective_limit,
|
"limit": effective_limit,
|
||||||
"current": 0,
|
"current": 0,
|
||||||
"percentage": 0,
|
"percentage": 0,
|
||||||
}
|
})
|
||||||
resp = _gemini_client.models.count_tokens(
|
resp = _gemini_client.models.count_tokens(
|
||||||
model=_model,
|
model=_model,
|
||||||
contents=history
|
contents=history
|
||||||
)
|
)
|
||||||
current_tokens = resp.total_tokens
|
current_tokens = resp.total_tokens
|
||||||
percentage = (current_tokens / effective_limit) * 100 if effective_limit > 0 else 0
|
percentage = (current_tokens / effective_limit) * 100 if effective_limit > 0 else 0
|
||||||
return {
|
return _add_bleed_derived({
|
||||||
"provider": "gemini",
|
"provider": "gemini",
|
||||||
"limit": effective_limit,
|
"limit": effective_limit,
|
||||||
"current": current_tokens,
|
"current": current_tokens,
|
||||||
"percentage": percentage,
|
"percentage": percentage,
|
||||||
}
|
}, sys_tok=0, tool_tok=0)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pass
|
pass
|
||||||
elif md_content:
|
elif md_content:
|
||||||
@@ -1780,20 +1795,20 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]:
|
|||||||
)
|
)
|
||||||
current_tokens = resp.total_tokens
|
current_tokens = resp.total_tokens
|
||||||
percentage = (current_tokens / effective_limit) * 100 if effective_limit > 0 else 0
|
percentage = (current_tokens / effective_limit) * 100 if effective_limit > 0 else 0
|
||||||
return {
|
return _add_bleed_derived({
|
||||||
"provider": "gemini",
|
"provider": "gemini",
|
||||||
"limit": effective_limit,
|
"limit": effective_limit,
|
||||||
"current": current_tokens,
|
"current": current_tokens,
|
||||||
"percentage": percentage,
|
"percentage": percentage,
|
||||||
}
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pass
|
pass
|
||||||
return {
|
return _add_bleed_derived({
|
||||||
"provider": "gemini",
|
"provider": "gemini",
|
||||||
"limit": effective_limit,
|
"limit": effective_limit,
|
||||||
"current": 0,
|
"current": 0,
|
||||||
"percentage": 0,
|
"percentage": 0,
|
||||||
}
|
})
|
||||||
elif _provider == "gemini_cli":
|
elif _provider == "gemini_cli":
|
||||||
effective_limit = _history_trunc_limit if _history_trunc_limit > 0 else _GEMINI_MAX_INPUT_TOKENS
|
effective_limit = _history_trunc_limit if _history_trunc_limit > 0 else _GEMINI_MAX_INPUT_TOKENS
|
||||||
limit_tokens = effective_limit
|
limit_tokens = effective_limit
|
||||||
@@ -1802,12 +1817,12 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]:
|
|||||||
u = _gemini_cli_adapter.last_usage
|
u = _gemini_cli_adapter.last_usage
|
||||||
current_tokens = u.get("input_tokens") or u.get("input", 0)
|
current_tokens = u.get("input_tokens") or u.get("input", 0)
|
||||||
percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0
|
percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0
|
||||||
return {
|
return _add_bleed_derived({
|
||||||
"provider": "gemini_cli",
|
"provider": "gemini_cli",
|
||||||
"limit": limit_tokens,
|
"limit": limit_tokens,
|
||||||
"current": current_tokens,
|
"current": current_tokens,
|
||||||
"percentage": percentage,
|
"percentage": percentage,
|
||||||
}
|
})
|
||||||
elif _provider == "deepseek":
|
elif _provider == "deepseek":
|
||||||
limit_tokens = 64000
|
limit_tokens = 64000
|
||||||
current_tokens = 0
|
current_tokens = 0
|
||||||
@@ -1829,15 +1844,15 @@ def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]:
|
|||||||
if md_content: current_tokens += len(md_content)
|
if md_content: current_tokens += len(md_content)
|
||||||
current_tokens = max(1, int(current_tokens / _CHARS_PER_TOKEN))
|
current_tokens = max(1, int(current_tokens / _CHARS_PER_TOKEN))
|
||||||
percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0
|
percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0
|
||||||
return {
|
return _add_bleed_derived({
|
||||||
"provider": "deepseek",
|
"provider": "deepseek",
|
||||||
"limit": limit_tokens,
|
"limit": limit_tokens,
|
||||||
"current": current_tokens,
|
"current": current_tokens,
|
||||||
"percentage": percentage,
|
"percentage": percentage,
|
||||||
}
|
})
|
||||||
return {
|
return _add_bleed_derived({
|
||||||
"provider": _provider,
|
"provider": _provider,
|
||||||
"limit": 0,
|
"limit": 0,
|
||||||
"current": 0,
|
"current": 0,
|
||||||
"percentage": 0,
|
"percentage": 0,
|
||||||
}
|
})
|
||||||
|
|||||||
@@ -4,10 +4,10 @@ Architecture reference: [docs/guide_architecture.md](../../docs/guide_architectu
|
|||||||
|
|
||||||
## Phase 1: Token Budget Display
|
## Phase 1: Token Budget Display
|
||||||
|
|
||||||
- [ ] Task 1.1: Add a new method `_render_token_budget_panel(self)` in `gui_2.py`. Place it in the Provider panel area (after `_render_provider_panel`, gui_2.py:2485-2542), or as a new collapsible section within the provider panel. Call `ai_client.get_history_bleed_stats(self._last_stable_md)` — need to cache `self._last_stable_md` from the last `_do_generate()` call (gui_2.py:1408-1425, the `stable_md` return value). Store the result in `self._token_stats: dict = {}`, refreshed on each `_do_generate` call and on provider/model switch.
|
- [x] Task 1.1: Add a new method `_render_token_budget_panel(self)` in `gui_2.py`. Place it in the Provider panel area (after `_render_provider_panel`, gui_2.py:2485-2542), or as a new collapsible section within the provider panel. Call `ai_client.get_history_bleed_stats(self._last_stable_md)` — need to cache `self._last_stable_md` from the last `_do_generate()` call (gui_2.py:1408-1425, the `stable_md` return value). Store the result in `self._token_stats: dict = {}`, refreshed on each `_do_generate` call and on provider/model switch.
|
||||||
- [ ] Task 1.2: Render the utilization bar. Use `imgui.progress_bar(stats['utilization_pct'] / 100, ImVec2(-1, 0), f"{stats['utilization_pct']:.1f}%")`. Color-code via `imgui.push_style_color(imgui.Col_.plot_histogram, ...)`: green if <50%, yellow if 50-80%, red if >80%. Below the bar, show: `f"{stats['estimated_prompt_tokens']:,} / {stats['max_prompt_tokens']:,} tokens ({stats['headroom_tokens']:,} remaining)"`.
|
- [x] Task 1.2: Render the utilization bar. Use `imgui.progress_bar(stats['utilization_pct'] / 100, ImVec2(-1, 0), f"{stats['utilization_pct']:.1f}%")`. Color-code via `imgui.push_style_color(imgui.Col_.plot_histogram, ...)`: green if <50%, yellow if 50-80%, red if >80%. Below the bar, show: `f"{stats['estimated_prompt_tokens']:,} / {stats['max_prompt_tokens']:,} tokens ({stats['headroom_tokens']:,} remaining)"`.
|
||||||
- [ ] Task 1.3: Render the proportion breakdown as a 3-row table: System (`system_tokens`), Tools (`tools_tokens`), History (`history_tokens`). Each row shows token count and percentage of total. Use `imgui.begin_table("token_breakdown", 3)` with columns: Component, Tokens, Pct.
|
- [x] Task 1.3: Render the proportion breakdown as a 3-row table: System (`system_tokens`), Tools (`tools_tokens`), History (`history_tokens`). Each row shows token count and percentage of total. Use `imgui.begin_table("token_breakdown", 3)` with columns: Component, Tokens, Pct.
|
||||||
- [ ] Task 1.4: Write tests verifying `_render_token_budget_panel` calls `get_history_bleed_stats` and handles the empty dict case (when no provider is configured).
|
- [~] Task 1.4: Write tests verifying `_render_token_budget_panel` calls `get_history_bleed_stats` and handles the empty dict case (when no provider is configured).
|
||||||
|
|
||||||
## Phase 2: Trimming Preview & Cache Status
|
## Phase 2: Trimming Preview & Cache Status
|
||||||
|
|
||||||
|
|||||||
45
gui_2.py
45
gui_2.py
@@ -296,6 +296,8 @@ class App:
|
|||||||
self._token_budget_current = 0
|
self._token_budget_current = 0
|
||||||
self._token_budget_limit = 0
|
self._token_budget_limit = 0
|
||||||
self._gemini_cache_text = ""
|
self._gemini_cache_text = ""
|
||||||
|
self._last_stable_md: str = ''
|
||||||
|
self._token_stats: dict = {}
|
||||||
self.ui_disc_truncate_pairs: int = 2
|
self.ui_disc_truncate_pairs: int = 2
|
||||||
self.ui_auto_scroll_comms = True
|
self.ui_auto_scroll_comms = True
|
||||||
self.ui_auto_scroll_tool_calls = True
|
self.ui_auto_scroll_tool_calls = True
|
||||||
@@ -552,6 +554,7 @@ class App:
|
|||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
try:
|
try:
|
||||||
md, path, file_items, stable_md, disc_text = self._do_generate()
|
md, path, file_items, stable_md, disc_text = self._do_generate()
|
||||||
|
self._last_stable_md = stable_md
|
||||||
self.last_md = md
|
self.last_md = md
|
||||||
self.last_md_path = path
|
self.last_md_path = path
|
||||||
self.last_file_items = file_items
|
self.last_file_items = file_items
|
||||||
@@ -1222,6 +1225,7 @@ class App:
|
|||||||
"""Logic for the 'Gen + Send' action."""
|
"""Logic for the 'Gen + Send' action."""
|
||||||
try:
|
try:
|
||||||
md, path, file_items, stable_md, disc_text = self._do_generate()
|
md, path, file_items, stable_md, disc_text = self._do_generate()
|
||||||
|
self._last_stable_md = stable_md
|
||||||
self.last_md = md
|
self.last_md = md
|
||||||
self.last_md_path = path
|
self.last_md_path = path
|
||||||
self.last_file_items = file_items
|
self.last_file_items = file_items
|
||||||
@@ -1373,6 +1377,7 @@ class App:
|
|||||||
self._token_budget_pct = stats.get("percentage", 0.0) / 100.0
|
self._token_budget_pct = stats.get("percentage", 0.0) / 100.0
|
||||||
self._token_budget_current = stats.get("current", 0)
|
self._token_budget_current = stats.get("current", 0)
|
||||||
self._token_budget_limit = stats.get("limit", 0)
|
self._token_budget_limit = stats.get("limit", 0)
|
||||||
|
self._token_stats = stats
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
threading.Thread(target=fetch_stats, daemon=True).start()
|
threading.Thread(target=fetch_stats, daemon=True).start()
|
||||||
@@ -2720,11 +2725,47 @@ class App:
|
|||||||
if usage["cache_read_input_tokens"]:
|
if usage["cache_read_input_tokens"]:
|
||||||
imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}")
|
imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}")
|
||||||
imgui.text("Token Budget:")
|
imgui.text("Token Budget:")
|
||||||
imgui.progress_bar(self._token_budget_pct, imgui.ImVec2(-1, 0), f"{self._token_budget_current:,} / {self._token_budget_limit:,}")
|
imgui.separator()
|
||||||
|
imgui.text("Token Budget")
|
||||||
|
self._render_token_budget_panel()
|
||||||
if self._gemini_cache_text:
|
if self._gemini_cache_text:
|
||||||
imgui.text_colored(C_SUB, self._gemini_cache_text)
|
imgui.text_colored(C_SUB, self._gemini_cache_text)
|
||||||
|
|
||||||
def _render_message_panel(self) -> None:
|
def _render_token_budget_panel(self) -> None:
|
||||||
|
stats = self._token_stats
|
||||||
|
if not stats:
|
||||||
|
imgui.text_disabled("Token stats unavailable")
|
||||||
|
return
|
||||||
|
pct = stats.get("utilization_pct", 0.0)
|
||||||
|
current = stats.get("estimated_prompt_tokens", 0)
|
||||||
|
limit = stats.get("max_prompt_tokens", 0)
|
||||||
|
headroom = stats.get("headroom_tokens", 0)
|
||||||
|
if pct < 50.0:
|
||||||
|
color = imgui.ImVec4(0.2, 0.8, 0.2, 1.0)
|
||||||
|
elif pct < 80.0:
|
||||||
|
color = imgui.ImVec4(1.0, 0.8, 0.0, 1.0)
|
||||||
|
else:
|
||||||
|
color = imgui.ImVec4(1.0, 0.2, 0.2, 1.0)
|
||||||
|
imgui.push_style_color(imgui.Col_.plot_histogram, color)
|
||||||
|
imgui.progress_bar(pct / 100.0, imgui.ImVec2(-1, 0), f"{pct:.1f}%")
|
||||||
|
imgui.pop_style_color()
|
||||||
|
imgui.text_disabled(f"{current:,} / {limit:,} tokens ({headroom:,} remaining)")
|
||||||
|
sys_tok = stats.get("system_tokens", 0)
|
||||||
|
tool_tok = stats.get("tools_tokens", 0)
|
||||||
|
hist_tok = stats.get("history_tokens", 0)
|
||||||
|
total_tok = sys_tok + tool_tok + hist_tok or 1
|
||||||
|
if imgui.begin_table("token_breakdown", 3, imgui.TableFlags_.borders_inner_h | imgui.TableFlags_.sizing_fixed_fit):
|
||||||
|
imgui.table_setup_column("Component")
|
||||||
|
imgui.table_setup_column("Tokens")
|
||||||
|
imgui.table_setup_column("Pct")
|
||||||
|
imgui.table_headers_row()
|
||||||
|
for lbl, tok in [("System", sys_tok), ("Tools", tool_tok), ("History", hist_tok)]:
|
||||||
|
imgui.table_next_row()
|
||||||
|
imgui.table_set_column_index(0); imgui.text(lbl)
|
||||||
|
imgui.table_set_column_index(1); imgui.text(f"{tok:,}")
|
||||||
|
imgui.table_set_column_index(2); imgui.text(f"{tok / total_tok * 100:.0f}%")
|
||||||
|
imgui.end_table()
|
||||||
|
|
||||||
# LIVE indicator
|
# LIVE indicator
|
||||||
is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
|
is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
|
||||||
if is_live:
|
if is_live:
|
||||||
|
|||||||
@@ -125,7 +125,7 @@ def get_dependencies(filepath: str) -> list[str]:
|
|||||||
print(f"Error getting dependencies for {filepath}: {e}")
|
print(f"Error getting dependencies for {filepath}: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
|
def execute_agent(role: str, prompt: str, docs: list[str], timeout: int | None = None) -> str:
|
||||||
model = get_model_for_role(role)
|
model = get_model_for_role(role)
|
||||||
# Advanced Context: Dependency skeletons for Tier 3
|
# Advanced Context: Dependency skeletons for Tier 3
|
||||||
injected_context = ""
|
injected_context = ""
|
||||||
@@ -205,6 +205,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
|
|||||||
text=True,
|
text=True,
|
||||||
encoding='utf-8',
|
encoding='utf-8',
|
||||||
env=env,
|
env=env,
|
||||||
|
timeout=timeout,
|
||||||
creationflags=subprocess.CREATE_NO_WINDOW if hasattr(subprocess, 'CREATE_NO_WINDOW') else 0,
|
creationflags=subprocess.CREATE_NO_WINDOW if hasattr(subprocess, 'CREATE_NO_WINDOW') else 0,
|
||||||
)
|
)
|
||||||
# claude --print outputs plain text — no JSON parsing needed
|
# claude --print outputs plain text — no JSON parsing needed
|
||||||
@@ -212,6 +213,10 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
|
|||||||
log_file = log_delegation(role, command_text, result, summary_prompt=prompt)
|
log_file = log_delegation(role, command_text, result, summary_prompt=prompt)
|
||||||
print(f"Sub-agent log created: {log_file}")
|
print(f"Sub-agent log created: {log_file}")
|
||||||
return result
|
return result
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
err_msg = f"Execution timed out after {timeout}s"
|
||||||
|
log_delegation(role, command_text, err_msg)
|
||||||
|
return err_msg
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
err_msg = f"Execution failed: {str(e)}"
|
err_msg = f"Execution failed: {str(e)}"
|
||||||
log_delegation(role, command_text, err_msg)
|
log_delegation(role, command_text, err_msg)
|
||||||
@@ -230,6 +235,12 @@ def create_parser() -> argparse.ArgumentParser:
|
|||||||
type=str,
|
type=str,
|
||||||
help="TOML file defining the task"
|
help="TOML file defining the task"
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--timeout",
|
||||||
|
type=int,
|
||||||
|
default=None,
|
||||||
|
help="Subprocess timeout in seconds (default: no timeout)"
|
||||||
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"prompt",
|
"prompt",
|
||||||
type=str,
|
type=str,
|
||||||
@@ -261,7 +272,7 @@ def main() -> None:
|
|||||||
if os.path.exists(ref) and ref not in docs:
|
if os.path.exists(ref) and ref not in docs:
|
||||||
docs.append(ref)
|
docs.append(ref)
|
||||||
print(f"Executing role: {role} with docs: {docs}")
|
print(f"Executing role: {role} with docs: {docs}")
|
||||||
result = execute_agent(role, prompt, docs)
|
result = execute_agent(role, prompt, docs, timeout=args.timeout)
|
||||||
print(result)
|
print(result)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
115
tests/test_token_viz.py
Normal file
115
tests/test_token_viz.py
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
"""Tests for context & token visualization (Track: context_token_viz_20260301)."""
|
||||||
|
from typing import Generator
|
||||||
|
from unittest.mock import patch
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import ai_client
|
||||||
|
from ai_client import _add_bleed_derived, get_history_bleed_stats
|
||||||
|
from gui_2 import App
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def app_instance() -> Generator[App, None, None]:
|
||||||
|
with (
|
||||||
|
patch('gui_2.load_config', return_value={'gui': {'show_windows': {}}}),
|
||||||
|
patch('gui_2.save_config'),
|
||||||
|
patch('gui_2.project_manager'),
|
||||||
|
patch('gui_2.session_logger'),
|
||||||
|
patch('gui_2.immapp.run'),
|
||||||
|
patch.object(App, '_load_active_project'),
|
||||||
|
patch.object(App, '_fetch_models'),
|
||||||
|
patch.object(App, '_load_fonts'),
|
||||||
|
patch.object(App, '_post_init')
|
||||||
|
):
|
||||||
|
yield App()
|
||||||
|
|
||||||
|
|
||||||
|
# --- _add_bleed_derived unit tests ---
|
||||||
|
|
||||||
|
def test_add_bleed_derived_aliases() -> None:
|
||||||
|
base = {"provider": "test", "limit": 1000, "current": 400, "percentage": 40.0}
|
||||||
|
result = _add_bleed_derived(base)
|
||||||
|
assert result["estimated_prompt_tokens"] == 400
|
||||||
|
assert result["max_prompt_tokens"] == 1000
|
||||||
|
assert result["utilization_pct"] == 40.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_bleed_derived_headroom() -> None:
|
||||||
|
base = {"provider": "test", "limit": 1000, "current": 400, "percentage": 40.0}
|
||||||
|
result = _add_bleed_derived(base)
|
||||||
|
assert result["headroom_tokens"] == 600
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_bleed_derived_would_trim_false() -> None:
|
||||||
|
base = {"provider": "test", "limit": 100000, "current": 10000, "percentage": 10.0}
|
||||||
|
result = _add_bleed_derived(base)
|
||||||
|
assert result["would_trim"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_bleed_derived_would_trim_true() -> None:
|
||||||
|
base = {"provider": "test", "limit": 100000, "current": 90000, "percentage": 90.0}
|
||||||
|
result = _add_bleed_derived(base)
|
||||||
|
assert result["would_trim"] is True # headroom = 10000 < 20000
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_bleed_derived_breakdown() -> None:
|
||||||
|
base = {"provider": "test", "limit": 10000, "current": 5000, "percentage": 50.0}
|
||||||
|
result = _add_bleed_derived(base, sys_tok=500, tool_tok=2500)
|
||||||
|
assert result["system_tokens"] == 500
|
||||||
|
assert result["tools_tokens"] == 2500
|
||||||
|
assert result["history_tokens"] == 2000 # 5000 - 500 - 2500
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_bleed_derived_history_clamped_to_zero() -> None:
|
||||||
|
"""history_tokens should not go negative when sys+tool > current."""
|
||||||
|
base = {"provider": "test", "limit": 1000, "current": 100, "percentage": 10.0}
|
||||||
|
result = _add_bleed_derived(base, sys_tok=200, tool_tok=2500)
|
||||||
|
assert result["history_tokens"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_bleed_derived_headroom_clamped_to_zero() -> None:
|
||||||
|
base = {"provider": "test", "limit": 1000, "current": 1100, "percentage": 110.0}
|
||||||
|
result = _add_bleed_derived(base)
|
||||||
|
assert result["headroom_tokens"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
# --- get_history_bleed_stats returns all required keys ---
|
||||||
|
|
||||||
|
REQUIRED_KEYS = [
|
||||||
|
"provider", "limit", "current", "percentage",
|
||||||
|
"estimated_prompt_tokens", "max_prompt_tokens", "utilization_pct",
|
||||||
|
"headroom_tokens", "would_trim", "system_tokens", "tools_tokens", "history_tokens",
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_get_history_bleed_stats_returns_all_keys_unknown_provider() -> None:
|
||||||
|
"""Fallback path (unknown provider) must still return all derived keys."""
|
||||||
|
original = ai_client._provider
|
||||||
|
try:
|
||||||
|
ai_client._provider = "unknown_test_provider"
|
||||||
|
stats = get_history_bleed_stats()
|
||||||
|
for key in REQUIRED_KEYS:
|
||||||
|
assert key in stats, f"Missing key: {key}"
|
||||||
|
finally:
|
||||||
|
ai_client._provider = original
|
||||||
|
|
||||||
|
|
||||||
|
# --- App initialization ---
|
||||||
|
|
||||||
|
def test_app_token_stats_initialized_empty(app_instance: App) -> None:
|
||||||
|
assert app_instance._token_stats == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_app_last_stable_md_initialized_empty(app_instance: App) -> None:
|
||||||
|
assert app_instance._last_stable_md == ""
|
||||||
|
|
||||||
|
|
||||||
|
def test_app_has_render_token_budget_panel(app_instance: App) -> None:
|
||||||
|
assert callable(getattr(app_instance, "_render_token_budget_panel", None))
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_token_budget_panel_empty_stats_no_crash(app_instance: App) -> None:
|
||||||
|
"""With empty _token_stats, _render_token_budget_panel must not raise."""
|
||||||
|
app_instance._token_stats = {}
|
||||||
|
# We can't render ImGui in tests, so just verify the guard condition logic
|
||||||
|
# by checking the method exists and _token_stats is empty (early-return path)
|
||||||
|
assert not app_instance._token_stats # falsy — method would return early
|
||||||
Reference in New Issue
Block a user