Compare commits
15 Commits
93e72b5530
...
2da1ef38af
| Author | SHA1 | Date | |
|---|---|---|---|
| 2da1ef38af | |||
| 40fc35f176 | |||
| 1a428e3c6a | |||
| 66f728e7a3 | |||
| eaaf09dc3c | |||
| abc0639602 | |||
| b792e34a64 | |||
| 8caebbd226 | |||
| 2dd6145bd8 | |||
| 0c27aa6c6b | |||
| e24664c7b2 | |||
| 20ebab55a0 | |||
| c44026c06c | |||
| 776f4e4370 | |||
| cd3f3c89ed |
@@ -19,6 +19,8 @@ from pathlib import Path
|
||||
import file_cache
|
||||
import mcp_client
|
||||
import google.genai
|
||||
from google.genai import types
|
||||
from events import EventEmitter
|
||||
|
||||
_provider: str = "gemini"
|
||||
_model: str = "gemini-2.5-flash"
|
||||
@@ -27,6 +29,9 @@ _max_tokens: int = 8192
|
||||
|
||||
_history_trunc_limit: int = 8000
|
||||
|
||||
# Global event emitter for API lifecycle events
|
||||
events = EventEmitter()
|
||||
|
||||
def set_model_params(temp: float, max_tok: int, trunc_limit: int = 8000):
|
||||
global _temperature, _max_tokens, _history_trunc_limit
|
||||
_temperature = temp
|
||||
@@ -616,6 +621,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
|
||||
r["output"] = val
|
||||
|
||||
for r_idx in range(MAX_TOOL_ROUNDS + 2):
|
||||
events.emit("request_start", payload={"provider": "gemini", "model": _model, "round": r_idx})
|
||||
resp = _gemini_chat.send_message(payload)
|
||||
txt = "\n".join(p.text for c in resp.candidates if getattr(c, "content", None) for p in c.content.parts if hasattr(p, "text") and p.text)
|
||||
if txt: all_text.append(txt)
|
||||
@@ -625,6 +631,16 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
|
||||
cached_tokens = getattr(resp.usage_metadata, "cached_content_token_count", None)
|
||||
if cached_tokens:
|
||||
usage["cache_read_input_tokens"] = cached_tokens
|
||||
|
||||
# Fetch cache stats in the background thread to avoid blocking GUI
|
||||
cache_stats = None
|
||||
try:
|
||||
cache_stats = get_gemini_cache_stats()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
events.emit("response_received", payload={"provider": "gemini", "model": _model, "usage": usage, "round": r_idx, "cache_stats": cache_stats})
|
||||
|
||||
reason = resp.candidates[0].finish_reason.name if resp.candidates and hasattr(resp.candidates[0], "finish_reason") else "STOP"
|
||||
|
||||
_append_comms("IN", "response", {"round": r_idx, "stop_reason": reason, "text": txt, "tool_calls": [{"name": c.name, "args": dict(c.args)} for c in calls], "usage": usage})
|
||||
@@ -658,6 +674,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
|
||||
f_resps, log = [], []
|
||||
for i, fc in enumerate(calls):
|
||||
name, args = fc.name, dict(fc.args)
|
||||
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
|
||||
if name in mcp_client.TOOL_NAMES:
|
||||
_append_comms("OUT", "tool_call", {"name": name, "args": args})
|
||||
out = mcp_client.dispatch(name, args)
|
||||
@@ -677,6 +694,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
|
||||
|
||||
f_resps.append(types.Part.from_function_response(name=name, response={"output": out}))
|
||||
log.append({"tool_use_id": name, "content": out})
|
||||
events.emit("tool_execution", payload={"status": "completed", "tool": name, "result": out, "round": r_idx})
|
||||
|
||||
_append_comms("OUT", "tool_result_send", {"results": log})
|
||||
payload = f_resps
|
||||
@@ -994,6 +1012,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
|
||||
def _strip_private_keys(history):
|
||||
return [{k: v for k, v in m.items() if not k.startswith("_")} for m in history]
|
||||
|
||||
events.emit("request_start", payload={"provider": "anthropic", "model": _model, "round": round_idx})
|
||||
response = _anthropic_client.messages.create(
|
||||
model=_model,
|
||||
max_tokens=_max_tokens,
|
||||
@@ -1032,6 +1051,8 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
|
||||
if cache_read is not None:
|
||||
usage_dict["cache_read_input_tokens"] = cache_read
|
||||
|
||||
events.emit("response_received", payload={"provider": "anthropic", "model": _model, "usage": usage_dict, "round": round_idx})
|
||||
|
||||
_append_comms("IN", "response", {
|
||||
"round": round_idx,
|
||||
"stop_reason": response.stop_reason,
|
||||
@@ -1055,6 +1076,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
|
||||
b_name = getattr(block, "name", None)
|
||||
b_id = getattr(block, "id", "")
|
||||
b_input = getattr(block, "input", {})
|
||||
events.emit("tool_execution", payload={"status": "started", "tool": b_name, "args": b_input, "round": round_idx})
|
||||
if b_name in mcp_client.TOOL_NAMES:
|
||||
_append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input})
|
||||
output = mcp_client.dispatch(b_name, b_input)
|
||||
@@ -1064,6 +1086,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
|
||||
"tool_use_id": b_id,
|
||||
"content": output,
|
||||
})
|
||||
events.emit("tool_execution", payload={"status": "completed", "tool": b_name, "result": output, "round": round_idx})
|
||||
elif b_name == TOOL_NAME:
|
||||
script = b_input.get("script", "")
|
||||
_append_comms("OUT", "tool_call", {
|
||||
@@ -1082,6 +1105,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
|
||||
"tool_use_id": b_id,
|
||||
"content": output,
|
||||
})
|
||||
events.emit("tool_execution", payload={"status": "completed", "tool": b_name, "result": output, "round": round_idx})
|
||||
|
||||
# Refresh file context after tool calls — only inject CHANGED files
|
||||
if file_items:
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
# Implementation Plan: Event-Driven API Metrics Updates
|
||||
|
||||
## Phase 1: Event Infrastructure & Test Setup [checkpoint: 776f4e4]
|
||||
Define the event mechanism and create baseline tests to ensure we don't break data accuracy.
|
||||
|
||||
- [x] Task: Create `tests/test_api_events.py` to verify the new event emission logic in isolation. cd3f3c8
|
||||
- [x] Task: Implement a simple `EventEmitter` or `Signal` class (if not already present) to handle decoupled communication. cd3f3c8
|
||||
- [x] Task: Instrument `ai_client.py` with the event system, adding placeholders for the key lifecycle events. cd3f3c8
|
||||
- [ ] Task: Conductor - User Manual Verification 'Phase 1: Event Infrastructure & Test Setup' (Protocol in workflow.md)
|
||||
|
||||
## Phase 2: Client Instrumentation (API Lifecycle) [checkpoint: e24664c]
|
||||
Update the AI client to emit events during actual API interactions.
|
||||
|
||||
- [x] Task: Implement event emission for Gemini and Anthropic request/response cycles in `ai_client.py`. 20ebab5
|
||||
- [x] Task: Implement event emission for tool/function calls and stream processing. 20ebab5
|
||||
- [x] Task: Verify via tests that events carry the correct payload (token counts, session metadata). 20ebab5
|
||||
- [x] Task: Conductor - User Manual Verification 'Phase 2: Client Instrumentation (API Lifecycle)' (Protocol in workflow.md) e24664c
|
||||
|
||||
## Phase 3: GUI Integration & Decoupling [checkpoint: 8caebbd]
|
||||
Connect the UI to the event system and remove polling logic.
|
||||
|
||||
- [x] Task: Update `gui.py` to subscribe to API events and trigger metrics UI refreshes only upon event receipt. 2dd6145
|
||||
- [x] Task: Audit the `gui.py` render loop and remove all per-frame metrics calculations or display updates. 2dd6145
|
||||
- [x] Task: Verify that UI performance improves (reduced CPU/frame time) while metrics remain accurate. 2dd6145
|
||||
- [x] Task: Conductor - User Manual Verification 'Phase 3: GUI Integration & Decoupling' (Protocol in workflow.md) 8caebbd
|
||||
|
||||
## Phase: Review Fixes
|
||||
- [x] Task: Apply review suggestions 66f728e
|
||||
@@ -14,4 +14,7 @@
|
||||
## Configuration & Tooling
|
||||
- **tomli-w:** For writing TOML configuration files.
|
||||
- **psutil:** For system and process monitoring (CPU/Memory telemetry).
|
||||
- **uv:** An extremely fast Python package and project manager.
|
||||
- **uv:** An extremely fast Python package and project manager.
|
||||
|
||||
## Architectural Patterns
|
||||
- **Event-Driven Metrics:** Uses a custom `EventEmitter` to decouple API lifecycle events from UI rendering, improving performance and responsiveness.
|
||||
@@ -17,10 +17,6 @@ This file tracks all major tracks for the project. Each track has its own detail
|
||||
- [x] **Track: Live GUI Testing Infrastructure**
|
||||
*Link: [./tracks/live_gui_testing_20260223/](./tracks/live_gui_testing_20260223/)*
|
||||
|
||||
---
|
||||
|
||||
- [ ] **Track: Event-Driven API Metrics Updates**
|
||||
*Link: [./tracks/event_driven_metrics_20260223/](./tracks/event_driven_metrics_20260223/)*
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
# Implementation Plan: Event-Driven API Metrics Updates
|
||||
|
||||
## Phase 1: Event Infrastructure & Test Setup
|
||||
Define the event mechanism and create baseline tests to ensure we don't break data accuracy.
|
||||
|
||||
- [ ] Task: Create `tests/test_api_events.py` to verify the new event emission logic in isolation.
|
||||
- [ ] Task: Implement a simple `EventEmitter` or `Signal` class (if not already present) to handle decoupled communication.
|
||||
- [ ] Task: Instrument `ai_client.py` with the event system, adding placeholders for the key lifecycle events.
|
||||
- [ ] Task: Conductor - User Manual Verification 'Phase 1: Event Infrastructure & Test Setup' (Protocol in workflow.md)
|
||||
|
||||
## Phase 2: Client Instrumentation (API Lifecycle)
|
||||
Update the AI client to emit events during actual API interactions.
|
||||
|
||||
- [ ] Task: Implement event emission for Gemini and Anthropic request/response cycles in `ai_client.py`.
|
||||
- [ ] Task: Implement event emission for tool/function calls and stream processing.
|
||||
- [ ] Task: Verify via tests that events carry the correct payload (token counts, session metadata).
|
||||
- [ ] Task: Conductor - User Manual Verification 'Phase 2: Client Instrumentation (API Lifecycle)' (Protocol in workflow.md)
|
||||
|
||||
## Phase 3: GUI Integration & Decoupling
|
||||
Connect the UI to the event system and remove polling logic.
|
||||
|
||||
- [ ] Task: Update `gui.py` to subscribe to API events and trigger metrics UI refreshes only upon event receipt.
|
||||
- [ ] Task: Audit the `gui.py` render loop and remove all per-frame metrics calculations or display updates.
|
||||
- [ ] Task: Verify that UI performance improves (reduced CPU/frame time) while metrics remain accurate.
|
||||
- [ ] Task: Conductor - User Manual Verification 'Phase 3: GUI Integration & Decoupling' (Protocol in workflow.md)
|
||||
@@ -0,0 +1,37 @@
|
||||
"""
|
||||
Decoupled event emission system for cross-module communication.
|
||||
"""
|
||||
from typing import Callable, Any, Dict, List
|
||||
|
||||
class EventEmitter:
|
||||
"""
|
||||
Simple event emitter for decoupled communication between modules.
|
||||
"""
|
||||
def __init__(self):
|
||||
"""Initializes the EventEmitter with an empty listener map."""
|
||||
self._listeners: Dict[str, List[Callable]] = {}
|
||||
|
||||
def on(self, event_name: str, callback: Callable):
|
||||
"""
|
||||
Registers a callback for a specific event.
|
||||
|
||||
Args:
|
||||
event_name: The name of the event to listen for.
|
||||
callback: The function to call when the event is emitted.
|
||||
"""
|
||||
if event_name not in self._listeners:
|
||||
self._listeners[event_name] = []
|
||||
self._listeners[event_name].append(callback)
|
||||
|
||||
def emit(self, event_name: str, *args: Any, **kwargs: Any):
|
||||
"""
|
||||
Emits an event, calling all registered callbacks.
|
||||
|
||||
Args:
|
||||
event_name: The name of the event to emit.
|
||||
*args: Positional arguments to pass to callbacks.
|
||||
**kwargs: Keyword arguments to pass to callbacks.
|
||||
"""
|
||||
if event_name in self._listeners:
|
||||
for callback in self._listeners[event_name]:
|
||||
callback(*args, **kwargs)
|
||||
@@ -496,6 +496,11 @@ class App:
|
||||
self._is_script_blinking = False
|
||||
self._script_blink_start_time = 0.0
|
||||
|
||||
# Subscribe to API lifecycle events
|
||||
ai_client.events.on("request_start", self._on_api_event)
|
||||
ai_client.events.on("response_received", self._on_api_event)
|
||||
ai_client.events.on("tool_execution", self._on_api_event)
|
||||
|
||||
self.perf_monitor = PerformanceMonitor()
|
||||
self.perf_history = {
|
||||
"frame_time": [0.0] * 100,
|
||||
@@ -822,40 +827,45 @@ class App:
|
||||
total = usage["input_tokens"] + usage["output_tokens"]
|
||||
dpg.set_value("ai_token_usage", f"Tokens: {total} (In: {usage['input_tokens']} Out: {usage['output_tokens']})")
|
||||
|
||||
def _update_telemetry_panel(self):
|
||||
"""Updates the token budget visualizer in the Provider panel."""
|
||||
# Update history bleed stats for all providers (throttled)
|
||||
now = time.time()
|
||||
if now - self._last_bleed_update_time > 2.0:
|
||||
self._last_bleed_update_time = now
|
||||
stats = ai_client.get_history_bleed_stats()
|
||||
if dpg.does_item_exist("token_budget_bar"):
|
||||
percentage = stats.get("percentage", 0.0)
|
||||
dpg.set_value("token_budget_bar", percentage / 100.0 if percentage else 0.0)
|
||||
if dpg.does_item_exist("token_budget_label"):
|
||||
current = stats.get("current", 0)
|
||||
limit = stats.get("limit", 0)
|
||||
dpg.set_value("token_budget_label", f"{current:,} / {limit:,}")
|
||||
def _on_api_event(self, *args, **kwargs):
|
||||
"""Callback for ai_client events. Queues a telemetry refresh on the main thread."""
|
||||
payload = kwargs.get("payload", {})
|
||||
with self._pending_gui_tasks_lock:
|
||||
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
|
||||
|
||||
# Update Gemini-specific cache stats (throttled with diagnostics)
|
||||
if now - self._last_diag_update_time > 10.0:
|
||||
self._last_diag_update_time = now
|
||||
|
||||
if dpg.does_item_exist("gemini_cache_label"):
|
||||
if self.current_provider == "gemini":
|
||||
try:
|
||||
cache_stats = ai_client.get_gemini_cache_stats()
|
||||
count = cache_stats.get("cache_count", 0)
|
||||
size_bytes = cache_stats.get("total_size_bytes", 0)
|
||||
size_kb = size_bytes / 1024.0
|
||||
text = f"Gemini Caches: {count} ({size_kb:.1f} KB)"
|
||||
dpg.set_value("gemini_cache_label", text)
|
||||
dpg.configure_item("gemini_cache_label", show=True)
|
||||
except Exception as e:
|
||||
# If the API call fails, just hide the label
|
||||
dpg.configure_item("gemini_cache_label", show=False)
|
||||
else:
|
||||
dpg.configure_item("gemini_cache_label", show=False)
|
||||
def _refresh_api_metrics(self, payload: dict = None):
|
||||
"""Updates the token budget and cache stats visualizers."""
|
||||
payload = payload or {}
|
||||
self._last_bleed_update_time = time.time()
|
||||
|
||||
# History bleed
|
||||
stats = ai_client.get_history_bleed_stats()
|
||||
if dpg.does_item_exist("token_budget_bar"):
|
||||
percentage = stats.get("percentage", 0.0)
|
||||
dpg.set_value("token_budget_bar", percentage / 100.0 if percentage else 0.0)
|
||||
if dpg.does_item_exist("token_budget_label"):
|
||||
current = stats.get("current", 0)
|
||||
limit = stats.get("limit", 0)
|
||||
dpg.set_value("token_budget_label", f"{current:,} / {limit:,}")
|
||||
|
||||
# Gemini cache - Use payload data to avoid blocking the main thread with network calls
|
||||
if dpg.does_item_exist("gemini_cache_label"):
|
||||
cache_stats = payload.get("cache_stats")
|
||||
if cache_stats:
|
||||
count = cache_stats.get("cache_count", 0)
|
||||
size_bytes = cache_stats.get("total_size_bytes", 0)
|
||||
size_kb = size_bytes / 1024.0
|
||||
text = f"Gemini Caches: {count} ({size_kb:.1f} KB)"
|
||||
dpg.set_value("gemini_cache_label", text)
|
||||
dpg.configure_item("gemini_cache_label", show=True)
|
||||
elif self.current_provider != "gemini":
|
||||
dpg.configure_item("gemini_cache_label", show=False)
|
||||
# Note: We don't hide it if no stats are in payload,
|
||||
# to avoid flickering during tool/chunk events that don't include stats.
|
||||
|
||||
def _update_performance_diagnostics(self):
|
||||
"""Updates performance diagnostics displays (throttled)."""
|
||||
now = time.time()
|
||||
|
||||
# Update Diagnostics panel (throttled for smoothness)
|
||||
if now - self._last_perf_update_time > 0.5:
|
||||
@@ -2221,6 +2231,34 @@ class App:
|
||||
dpg.add_line_series(list(range(100)), self.perf_history["cpu"], label="cpu usage", tag="perf_cpu_plot")
|
||||
dpg.set_axis_limits("axis_cpu_y", 0, 100)
|
||||
|
||||
def _process_pending_gui_tasks(self):
|
||||
"""Processes tasks queued from background threads on the main thread."""
|
||||
if not self._pending_gui_tasks:
|
||||
return
|
||||
|
||||
with self._pending_gui_tasks_lock:
|
||||
gui_tasks = self._pending_gui_tasks[:]
|
||||
self._pending_gui_tasks.clear()
|
||||
|
||||
for task in gui_tasks:
|
||||
try:
|
||||
action = task.get("action")
|
||||
if action == "set_value":
|
||||
item = task.get("item")
|
||||
val = task.get("value")
|
||||
if item and dpg.does_item_exist(item):
|
||||
dpg.set_value(item, val)
|
||||
elif action == "click":
|
||||
item = task.get("item")
|
||||
if item and dpg.does_item_exist(item):
|
||||
cb = dpg.get_item_callback(item)
|
||||
if cb:
|
||||
cb()
|
||||
elif action == "refresh_api_metrics":
|
||||
self._refresh_api_metrics(task.get("payload", {}))
|
||||
except Exception as e:
|
||||
print(f"Error executing GUI hook task: {e}")
|
||||
|
||||
def run(self):
|
||||
dpg.create_context()
|
||||
dpg.configure_app(docking=True, docking_space=True, init_file="dpg_layout.ini")
|
||||
@@ -2272,25 +2310,7 @@ class App:
|
||||
|
||||
# Process queued API GUI tasks
|
||||
self.perf_monitor.start_component("GUI_Tasks")
|
||||
with self._pending_gui_tasks_lock:
|
||||
gui_tasks = self._pending_gui_tasks[:]
|
||||
self._pending_gui_tasks.clear()
|
||||
for task in gui_tasks:
|
||||
try:
|
||||
action = task.get("action")
|
||||
if action == "set_value":
|
||||
item = task.get("item")
|
||||
val = task.get("value")
|
||||
if item and dpg.does_item_exist(item):
|
||||
dpg.set_value(item, val)
|
||||
elif action == "click":
|
||||
item = task.get("item")
|
||||
if item and dpg.does_item_exist(item):
|
||||
cb = dpg.get_item_callback(item)
|
||||
if cb:
|
||||
cb()
|
||||
except Exception as e:
|
||||
print(f"Error executing GUI hook task: {e}")
|
||||
self._process_pending_gui_tasks()
|
||||
self.perf_monitor.end_component("GUI_Tasks")
|
||||
|
||||
# Handle retro arcade blinking effect
|
||||
@@ -2394,7 +2414,7 @@ class App:
|
||||
self.perf_monitor.end_component("Comms")
|
||||
|
||||
self.perf_monitor.start_component("Telemetry")
|
||||
self._update_telemetry_panel()
|
||||
self._update_performance_diagnostics()
|
||||
self.perf_monitor.end_component("Telemetry")
|
||||
|
||||
self.perf_monitor.end_frame()
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
import ai_client
|
||||
|
||||
def test_ai_client_event_emitter_exists():
|
||||
# This should fail initially because 'events' won't exist on ai_client
|
||||
assert hasattr(ai_client, 'events')
|
||||
assert ai_client.events is not None
|
||||
|
||||
def test_event_emission():
|
||||
# We'll expect these event names based on the spec
|
||||
mock_callback = MagicMock()
|
||||
ai_client.events.on("request_start", mock_callback)
|
||||
|
||||
# Trigger something that should emit the event (once implemented)
|
||||
# For now, we just test the emitter itself if we were to call it manually
|
||||
ai_client.events.emit("request_start", payload={"model": "test"})
|
||||
|
||||
mock_callback.assert_called_once_with(payload={"model": "test"})
|
||||
|
||||
def test_send_emits_events():
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
# We need to mock _ensure_gemini_client and the chat object it creates
|
||||
with patch("ai_client._ensure_gemini_client"), \
|
||||
patch("ai_client._gemini_client") as mock_client, \
|
||||
patch("ai_client._gemini_chat") as mock_chat:
|
||||
|
||||
# Setup mock response
|
||||
mock_response = MagicMock()
|
||||
mock_response.candidates = []
|
||||
# Explicitly set usage_metadata as a mock with integer values
|
||||
mock_usage = MagicMock()
|
||||
mock_usage.prompt_token_count = 10
|
||||
mock_usage.candidates_token_count = 5
|
||||
mock_usage.cached_content_token_count = None
|
||||
mock_response.usage_metadata = mock_usage
|
||||
mock_chat.send_message.return_value = mock_response
|
||||
mock_client.chats.create.return_value = mock_chat
|
||||
|
||||
ai_client.set_provider("gemini", "gemini-flash")
|
||||
|
||||
start_callback = MagicMock()
|
||||
response_callback = MagicMock()
|
||||
|
||||
ai_client.events.on("request_start", start_callback)
|
||||
ai_client.events.on("response_received", response_callback)
|
||||
|
||||
# We need to bypass the context changed check or set it up
|
||||
ai_client.send("context", "message")
|
||||
|
||||
assert start_callback.called
|
||||
assert response_callback.called
|
||||
|
||||
# Check payload
|
||||
args, kwargs = start_callback.call_args
|
||||
assert kwargs['payload']['provider'] == 'gemini'
|
||||
|
||||
def test_send_emits_tool_events():
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
with patch("ai_client._ensure_gemini_client"), \
|
||||
patch("ai_client._gemini_client") as mock_client, \
|
||||
patch("ai_client._gemini_chat") as mock_chat, \
|
||||
patch("mcp_client.dispatch") as mock_dispatch:
|
||||
|
||||
# 1. Setup mock response with a tool call
|
||||
mock_fc = MagicMock()
|
||||
mock_fc.name = "read_file"
|
||||
mock_fc.args = {"path": "test.txt"}
|
||||
|
||||
mock_response_with_tool = MagicMock()
|
||||
mock_response_with_tool.candidates = [MagicMock()]
|
||||
mock_part = MagicMock()
|
||||
mock_part.text = "tool call text"
|
||||
mock_part.function_call = mock_fc
|
||||
mock_response_with_tool.candidates[0].content.parts = [mock_part]
|
||||
mock_response_with_tool.candidates[0].finish_reason.name = "STOP"
|
||||
|
||||
# Setup mock usage
|
||||
mock_usage = MagicMock()
|
||||
mock_usage.prompt_token_count = 10
|
||||
mock_usage.candidates_token_count = 5
|
||||
mock_usage.cached_content_token_count = None
|
||||
mock_response_with_tool.usage_metadata = mock_usage
|
||||
|
||||
# 2. Setup second mock response (final answer)
|
||||
mock_response_final = MagicMock()
|
||||
mock_response_final.candidates = []
|
||||
mock_response_final.usage_metadata = mock_usage
|
||||
|
||||
mock_chat.send_message.side_effect = [mock_response_with_tool, mock_response_final]
|
||||
mock_dispatch.return_value = "file content"
|
||||
|
||||
ai_client.set_provider("gemini", "gemini-flash")
|
||||
|
||||
tool_callback = MagicMock()
|
||||
ai_client.events.on("tool_execution", tool_callback)
|
||||
|
||||
ai_client.send("context", "message")
|
||||
|
||||
# Should be called twice: once for 'started', once for 'completed'
|
||||
assert tool_callback.call_count == 2
|
||||
|
||||
# Check 'started' call
|
||||
args, kwargs = tool_callback.call_args_list[0]
|
||||
assert kwargs['payload']['status'] == 'started'
|
||||
assert kwargs['payload']['tool'] == 'read_file'
|
||||
|
||||
# Check 'completed' call
|
||||
args, kwargs = tool_callback.call_args_list[1]
|
||||
assert kwargs['payload']['status'] == 'completed'
|
||||
assert kwargs['payload']['result'] == 'file content'
|
||||
@@ -53,7 +53,7 @@ def test_diagnostics_panel_updates(app_instance):
|
||||
|
||||
# We also need to mock ai_client stats
|
||||
with patch('ai_client.get_history_bleed_stats', return_value={}):
|
||||
app_instance._update_telemetry_panel()
|
||||
app_instance._update_performance_diagnostics()
|
||||
|
||||
# Verify UI updates
|
||||
mock_set_value.assert_any_call("perf_fps_text", "100.0")
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
import dearpygui.dearpygui as dpg
|
||||
import gui
|
||||
from gui import App
|
||||
import ai_client
|
||||
|
||||
@pytest.fixture
|
||||
def app_instance():
|
||||
"""
|
||||
Fixture to create an instance of the App class for testing.
|
||||
It creates a real DPG context but mocks functions that would
|
||||
render a window or block execution.
|
||||
"""
|
||||
dpg.create_context()
|
||||
|
||||
with patch('dearpygui.dearpygui.create_viewport'), \
|
||||
patch('dearpygui.dearpygui.setup_dearpygui'), \
|
||||
patch('dearpygui.dearpygui.show_viewport'), \
|
||||
patch('dearpygui.dearpygui.start_dearpygui'), \
|
||||
patch('gui.load_config', return_value={}), \
|
||||
patch('gui.PerformanceMonitor'), \
|
||||
patch('gui.shell_runner'), \
|
||||
patch('gui.project_manager'), \
|
||||
patch.object(App, '_load_active_project'), \
|
||||
patch.object(App, '_rebuild_files_list'), \
|
||||
patch.object(App, '_rebuild_shots_list'), \
|
||||
patch.object(App, '_rebuild_disc_list'), \
|
||||
patch.object(App, '_rebuild_disc_roles_list'), \
|
||||
patch.object(App, '_rebuild_discussion_selector'), \
|
||||
patch.object(App, '_refresh_project_widgets'):
|
||||
|
||||
app = App()
|
||||
yield app
|
||||
|
||||
dpg.destroy_context()
|
||||
|
||||
def test_gui_updates_on_event(app_instance):
|
||||
# Patch dependencies for the test
|
||||
with patch('dearpygui.dearpygui.set_value') as mock_set_value, \
|
||||
patch('dearpygui.dearpygui.does_item_exist', return_value=True), \
|
||||
patch('dearpygui.dearpygui.configure_item'), \
|
||||
patch('ai_client.get_history_bleed_stats') as mock_stats:
|
||||
|
||||
mock_stats.return_value = {"percentage": 50.0, "current": 500, "limit": 1000}
|
||||
|
||||
# We'll use patch.object to see if _refresh_api_metrics is called
|
||||
with patch.object(app_instance, '_refresh_api_metrics', wraps=app_instance._refresh_api_metrics) as mock_refresh:
|
||||
# Simulate event
|
||||
ai_client.events.emit("response_received", payload={})
|
||||
|
||||
# Process tasks manually
|
||||
app_instance._process_pending_gui_tasks()
|
||||
|
||||
# Verify that _refresh_api_metrics was called
|
||||
mock_refresh.assert_called_once()
|
||||
|
||||
# Verify that dpg.set_value was called for the metrics widgets
|
||||
calls = [call.args[0] for call in mock_set_value.call_args_list]
|
||||
assert "token_budget_bar" in calls
|
||||
assert "token_budget_label" in calls
|
||||
@@ -41,7 +41,7 @@ def app_instance():
|
||||
|
||||
def test_telemetry_panel_updates_correctly(app_instance):
|
||||
"""
|
||||
Tests that the _update_telemetry_panel method correctly updates
|
||||
Tests that the _update_performance_diagnostics method correctly updates
|
||||
DPG widgets based on the stats from ai_client.
|
||||
"""
|
||||
# 1. Set the provider to anthropic
|
||||
@@ -64,7 +64,7 @@ def test_telemetry_panel_updates_correctly(app_instance):
|
||||
patch('dearpygui.dearpygui.does_item_exist', return_value=True) as mock_does_item_exist:
|
||||
|
||||
# 4. Call the method under test
|
||||
app_instance._update_telemetry_panel()
|
||||
app_instance._refresh_api_metrics()
|
||||
|
||||
# 5. Assert the results
|
||||
mock_get_stats.assert_called_once()
|
||||
@@ -78,7 +78,7 @@ def test_telemetry_panel_updates_correctly(app_instance):
|
||||
|
||||
def test_cache_data_display_updates_correctly(app_instance):
|
||||
"""
|
||||
Tests that the _update_telemetry_panel method correctly updates the
|
||||
Tests that the _update_performance_diagnostics method correctly updates the
|
||||
GUI with Gemini cache statistics when the provider is set to Gemini.
|
||||
"""
|
||||
# 1. Set the provider to Gemini
|
||||
@@ -103,11 +103,11 @@ def test_cache_data_display_updates_correctly(app_instance):
|
||||
# We also need to mock get_history_bleed_stats as it's called in the same function
|
||||
with patch('ai_client.get_history_bleed_stats', return_value={}):
|
||||
|
||||
# 4. Call the method under test
|
||||
app_instance._update_telemetry_panel()
|
||||
# 4. Call the method under test with payload
|
||||
app_instance._refresh_api_metrics(payload={'cache_stats': mock_cache_stats})
|
||||
|
||||
# 5. Assert the results
|
||||
mock_get_cache_stats.assert_called_once()
|
||||
# mock_get_cache_stats.assert_called_once() # No longer called synchronously
|
||||
|
||||
# Check that the UI item was shown and its value was set
|
||||
mock_configure_item.assert_any_call("gemini_cache_label", show=True)
|
||||
|
||||
Reference in New Issue
Block a user