15 Commits

Author SHA1 Message Date
ed 2da1ef38af remove event driven metrics frorm tracks 2026-02-23 16:47:15 -05:00
ed 40fc35f176 chore(conductor): Archive track 'event_driven_metrics_20260223' 2026-02-23 16:46:20 -05:00
ed 1a428e3c6a conductor(plan): Mark task 'Apply review suggestions' as complete 2026-02-23 16:45:42 -05:00
ed 66f728e7a3 fix(conductor): Apply review suggestions for track 'event_driven_metrics_20260223' 2026-02-23 16:45:34 -05:00
ed eaaf09dc3c docs(conductor): Synchronize docs for track 'Event-Driven API Metrics Updates' 2026-02-23 16:39:46 -05:00
ed abc0639602 chore(conductor): Mark track 'Event-Driven API Metrics Updates' as complete 2026-02-23 16:39:02 -05:00
ed b792e34a64 conductor(plan): Mark Phase 3 as complete 2026-02-23 16:38:54 -05:00
ed 8caebbd226 conductor(checkpoint): Checkpoint end of Phase 3 2026-02-23 16:38:27 -05:00
ed 2dd6145bd8 feat(gui): Implement event-driven API metrics updates and decouple from render loop 2026-02-23 16:38:23 -05:00
ed 0c27aa6c6b conductor(plan): Mark Phase 2 as complete 2026-02-23 16:32:10 -05:00
ed e24664c7b2 conductor(checkpoint): Checkpoint end of Phase 2 2026-02-23 16:31:56 -05:00
ed 20ebab55a0 feat(ai_client): Emit API lifecycle and tool execution events 2026-02-23 16:31:48 -05:00
ed c44026c06c conductor(plan): Mark Phase 1 as complete 2026-02-23 16:25:48 -05:00
ed 776f4e4370 conductor(checkpoint): Checkpoint end of Phase 1 2026-02-23 16:25:38 -05:00
ed cd3f3c89ed feat(events): Add EventEmitter and instrument ai_client.py 2026-02-23 16:23:55 -05:00
14 changed files with 349 additions and 90 deletions
+24
View File
@@ -19,6 +19,8 @@ from pathlib import Path
import file_cache import file_cache
import mcp_client import mcp_client
import google.genai import google.genai
from google.genai import types
from events import EventEmitter
_provider: str = "gemini" _provider: str = "gemini"
_model: str = "gemini-2.5-flash" _model: str = "gemini-2.5-flash"
@@ -27,6 +29,9 @@ _max_tokens: int = 8192
_history_trunc_limit: int = 8000 _history_trunc_limit: int = 8000
# Global event emitter for API lifecycle events
events = EventEmitter()
def set_model_params(temp: float, max_tok: int, trunc_limit: int = 8000): def set_model_params(temp: float, max_tok: int, trunc_limit: int = 8000):
global _temperature, _max_tokens, _history_trunc_limit global _temperature, _max_tokens, _history_trunc_limit
_temperature = temp _temperature = temp
@@ -616,6 +621,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
r["output"] = val r["output"] = val
for r_idx in range(MAX_TOOL_ROUNDS + 2): for r_idx in range(MAX_TOOL_ROUNDS + 2):
events.emit("request_start", payload={"provider": "gemini", "model": _model, "round": r_idx})
resp = _gemini_chat.send_message(payload) resp = _gemini_chat.send_message(payload)
txt = "\n".join(p.text for c in resp.candidates if getattr(c, "content", None) for p in c.content.parts if hasattr(p, "text") and p.text) txt = "\n".join(p.text for c in resp.candidates if getattr(c, "content", None) for p in c.content.parts if hasattr(p, "text") and p.text)
if txt: all_text.append(txt) if txt: all_text.append(txt)
@@ -625,6 +631,16 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
cached_tokens = getattr(resp.usage_metadata, "cached_content_token_count", None) cached_tokens = getattr(resp.usage_metadata, "cached_content_token_count", None)
if cached_tokens: if cached_tokens:
usage["cache_read_input_tokens"] = cached_tokens usage["cache_read_input_tokens"] = cached_tokens
# Fetch cache stats in the background thread to avoid blocking GUI
cache_stats = None
try:
cache_stats = get_gemini_cache_stats()
except Exception:
pass
events.emit("response_received", payload={"provider": "gemini", "model": _model, "usage": usage, "round": r_idx, "cache_stats": cache_stats})
reason = resp.candidates[0].finish_reason.name if resp.candidates and hasattr(resp.candidates[0], "finish_reason") else "STOP" reason = resp.candidates[0].finish_reason.name if resp.candidates and hasattr(resp.candidates[0], "finish_reason") else "STOP"
_append_comms("IN", "response", {"round": r_idx, "stop_reason": reason, "text": txt, "tool_calls": [{"name": c.name, "args": dict(c.args)} for c in calls], "usage": usage}) _append_comms("IN", "response", {"round": r_idx, "stop_reason": reason, "text": txt, "tool_calls": [{"name": c.name, "args": dict(c.args)} for c in calls], "usage": usage})
@@ -658,6 +674,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
f_resps, log = [], [] f_resps, log = [], []
for i, fc in enumerate(calls): for i, fc in enumerate(calls):
name, args = fc.name, dict(fc.args) name, args = fc.name, dict(fc.args)
events.emit("tool_execution", payload={"status": "started", "tool": name, "args": args, "round": r_idx})
if name in mcp_client.TOOL_NAMES: if name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": name, "args": args}) _append_comms("OUT", "tool_call", {"name": name, "args": args})
out = mcp_client.dispatch(name, args) out = mcp_client.dispatch(name, args)
@@ -677,6 +694,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
f_resps.append(types.Part.from_function_response(name=name, response={"output": out})) f_resps.append(types.Part.from_function_response(name=name, response={"output": out}))
log.append({"tool_use_id": name, "content": out}) log.append({"tool_use_id": name, "content": out})
events.emit("tool_execution", payload={"status": "completed", "tool": name, "result": out, "round": r_idx})
_append_comms("OUT", "tool_result_send", {"results": log}) _append_comms("OUT", "tool_result_send", {"results": log})
payload = f_resps payload = f_resps
@@ -994,6 +1012,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
def _strip_private_keys(history): def _strip_private_keys(history):
return [{k: v for k, v in m.items() if not k.startswith("_")} for m in history] return [{k: v for k, v in m.items() if not k.startswith("_")} for m in history]
events.emit("request_start", payload={"provider": "anthropic", "model": _model, "round": round_idx})
response = _anthropic_client.messages.create( response = _anthropic_client.messages.create(
model=_model, model=_model,
max_tokens=_max_tokens, max_tokens=_max_tokens,
@@ -1032,6 +1051,8 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
if cache_read is not None: if cache_read is not None:
usage_dict["cache_read_input_tokens"] = cache_read usage_dict["cache_read_input_tokens"] = cache_read
events.emit("response_received", payload={"provider": "anthropic", "model": _model, "usage": usage_dict, "round": round_idx})
_append_comms("IN", "response", { _append_comms("IN", "response", {
"round": round_idx, "round": round_idx,
"stop_reason": response.stop_reason, "stop_reason": response.stop_reason,
@@ -1055,6 +1076,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
b_name = getattr(block, "name", None) b_name = getattr(block, "name", None)
b_id = getattr(block, "id", "") b_id = getattr(block, "id", "")
b_input = getattr(block, "input", {}) b_input = getattr(block, "input", {})
events.emit("tool_execution", payload={"status": "started", "tool": b_name, "args": b_input, "round": round_idx})
if b_name in mcp_client.TOOL_NAMES: if b_name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input}) _append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input})
output = mcp_client.dispatch(b_name, b_input) output = mcp_client.dispatch(b_name, b_input)
@@ -1064,6 +1086,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
"tool_use_id": b_id, "tool_use_id": b_id,
"content": output, "content": output,
}) })
events.emit("tool_execution", payload={"status": "completed", "tool": b_name, "result": output, "round": round_idx})
elif b_name == TOOL_NAME: elif b_name == TOOL_NAME:
script = b_input.get("script", "") script = b_input.get("script", "")
_append_comms("OUT", "tool_call", { _append_comms("OUT", "tool_call", {
@@ -1082,6 +1105,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
"tool_use_id": b_id, "tool_use_id": b_id,
"content": output, "content": output,
}) })
events.emit("tool_execution", payload={"status": "completed", "tool": b_name, "result": output, "round": round_idx})
# Refresh file context after tool calls — only inject CHANGED files # Refresh file context after tool calls — only inject CHANGED files
if file_items: if file_items:
@@ -0,0 +1,28 @@
# Implementation Plan: Event-Driven API Metrics Updates
## Phase 1: Event Infrastructure & Test Setup [checkpoint: 776f4e4]
Define the event mechanism and create baseline tests to ensure we don't break data accuracy.
- [x] Task: Create `tests/test_api_events.py` to verify the new event emission logic in isolation. cd3f3c8
- [x] Task: Implement a simple `EventEmitter` or `Signal` class (if not already present) to handle decoupled communication. cd3f3c8
- [x] Task: Instrument `ai_client.py` with the event system, adding placeholders for the key lifecycle events. cd3f3c8
- [ ] Task: Conductor - User Manual Verification 'Phase 1: Event Infrastructure & Test Setup' (Protocol in workflow.md)
## Phase 2: Client Instrumentation (API Lifecycle) [checkpoint: e24664c]
Update the AI client to emit events during actual API interactions.
- [x] Task: Implement event emission for Gemini and Anthropic request/response cycles in `ai_client.py`. 20ebab5
- [x] Task: Implement event emission for tool/function calls and stream processing. 20ebab5
- [x] Task: Verify via tests that events carry the correct payload (token counts, session metadata). 20ebab5
- [x] Task: Conductor - User Manual Verification 'Phase 2: Client Instrumentation (API Lifecycle)' (Protocol in workflow.md) e24664c
## Phase 3: GUI Integration & Decoupling [checkpoint: 8caebbd]
Connect the UI to the event system and remove polling logic.
- [x] Task: Update `gui.py` to subscribe to API events and trigger metrics UI refreshes only upon event receipt. 2dd6145
- [x] Task: Audit the `gui.py` render loop and remove all per-frame metrics calculations or display updates. 2dd6145
- [x] Task: Verify that UI performance improves (reduced CPU/frame time) while metrics remain accurate. 2dd6145
- [x] Task: Conductor - User Manual Verification 'Phase 3: GUI Integration & Decoupling' (Protocol in workflow.md) 8caebbd
## Phase: Review Fixes
- [x] Task: Apply review suggestions 66f728e
+3
View File
@@ -15,3 +15,6 @@
- **tomli-w:** For writing TOML configuration files. - **tomli-w:** For writing TOML configuration files.
- **psutil:** For system and process monitoring (CPU/Memory telemetry). - **psutil:** For system and process monitoring (CPU/Memory telemetry).
- **uv:** An extremely fast Python package and project manager. - **uv:** An extremely fast Python package and project manager.
## Architectural Patterns
- **Event-Driven Metrics:** Uses a custom `EventEmitter` to decouple API lifecycle events from UI rendering, improving performance and responsiveness.
-4
View File
@@ -17,10 +17,6 @@ This file tracks all major tracks for the project. Each track has its own detail
- [x] **Track: Live GUI Testing Infrastructure** - [x] **Track: Live GUI Testing Infrastructure**
*Link: [./tracks/live_gui_testing_20260223/](./tracks/live_gui_testing_20260223/)* *Link: [./tracks/live_gui_testing_20260223/](./tracks/live_gui_testing_20260223/)*
---
- [ ] **Track: Event-Driven API Metrics Updates**
*Link: [./tracks/event_driven_metrics_20260223/](./tracks/event_driven_metrics_20260223/)*
@@ -1,25 +0,0 @@
# Implementation Plan: Event-Driven API Metrics Updates
## Phase 1: Event Infrastructure & Test Setup
Define the event mechanism and create baseline tests to ensure we don't break data accuracy.
- [ ] Task: Create `tests/test_api_events.py` to verify the new event emission logic in isolation.
- [ ] Task: Implement a simple `EventEmitter` or `Signal` class (if not already present) to handle decoupled communication.
- [ ] Task: Instrument `ai_client.py` with the event system, adding placeholders for the key lifecycle events.
- [ ] Task: Conductor - User Manual Verification 'Phase 1: Event Infrastructure & Test Setup' (Protocol in workflow.md)
## Phase 2: Client Instrumentation (API Lifecycle)
Update the AI client to emit events during actual API interactions.
- [ ] Task: Implement event emission for Gemini and Anthropic request/response cycles in `ai_client.py`.
- [ ] Task: Implement event emission for tool/function calls and stream processing.
- [ ] Task: Verify via tests that events carry the correct payload (token counts, session metadata).
- [ ] Task: Conductor - User Manual Verification 'Phase 2: Client Instrumentation (API Lifecycle)' (Protocol in workflow.md)
## Phase 3: GUI Integration & Decoupling
Connect the UI to the event system and remove polling logic.
- [ ] Task: Update `gui.py` to subscribe to API events and trigger metrics UI refreshes only upon event receipt.
- [ ] Task: Audit the `gui.py` render loop and remove all per-frame metrics calculations or display updates.
- [ ] Task: Verify that UI performance improves (reduced CPU/frame time) while metrics remain accurate.
- [ ] Task: Conductor - User Manual Verification 'Phase 3: GUI Integration & Decoupling' (Protocol in workflow.md)
+37
View File
@@ -0,0 +1,37 @@
"""
Decoupled event emission system for cross-module communication.
"""
from typing import Callable, Any, Dict, List
class EventEmitter:
"""
Simple event emitter for decoupled communication between modules.
"""
def __init__(self):
"""Initializes the EventEmitter with an empty listener map."""
self._listeners: Dict[str, List[Callable]] = {}
def on(self, event_name: str, callback: Callable):
"""
Registers a callback for a specific event.
Args:
event_name: The name of the event to listen for.
callback: The function to call when the event is emitted.
"""
if event_name not in self._listeners:
self._listeners[event_name] = []
self._listeners[event_name].append(callback)
def emit(self, event_name: str, *args: Any, **kwargs: Any):
"""
Emits an event, calling all registered callbacks.
Args:
event_name: The name of the event to emit.
*args: Positional arguments to pass to callbacks.
**kwargs: Keyword arguments to pass to callbacks.
"""
if event_name in self._listeners:
for callback in self._listeners[event_name]:
callback(*args, **kwargs)
+57 -37
View File
@@ -496,6 +496,11 @@ class App:
self._is_script_blinking = False self._is_script_blinking = False
self._script_blink_start_time = 0.0 self._script_blink_start_time = 0.0
# Subscribe to API lifecycle events
ai_client.events.on("request_start", self._on_api_event)
ai_client.events.on("response_received", self._on_api_event)
ai_client.events.on("tool_execution", self._on_api_event)
self.perf_monitor = PerformanceMonitor() self.perf_monitor = PerformanceMonitor()
self.perf_history = { self.perf_history = {
"frame_time": [0.0] * 100, "frame_time": [0.0] * 100,
@@ -822,12 +827,18 @@ class App:
total = usage["input_tokens"] + usage["output_tokens"] total = usage["input_tokens"] + usage["output_tokens"]
dpg.set_value("ai_token_usage", f"Tokens: {total} (In: {usage['input_tokens']} Out: {usage['output_tokens']})") dpg.set_value("ai_token_usage", f"Tokens: {total} (In: {usage['input_tokens']} Out: {usage['output_tokens']})")
def _update_telemetry_panel(self): def _on_api_event(self, *args, **kwargs):
"""Updates the token budget visualizer in the Provider panel.""" """Callback for ai_client events. Queues a telemetry refresh on the main thread."""
# Update history bleed stats for all providers (throttled) payload = kwargs.get("payload", {})
now = time.time() with self._pending_gui_tasks_lock:
if now - self._last_bleed_update_time > 2.0: self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
self._last_bleed_update_time = now
def _refresh_api_metrics(self, payload: dict = None):
"""Updates the token budget and cache stats visualizers."""
payload = payload or {}
self._last_bleed_update_time = time.time()
# History bleed
stats = ai_client.get_history_bleed_stats() stats = ai_client.get_history_bleed_stats()
if dpg.does_item_exist("token_budget_bar"): if dpg.does_item_exist("token_budget_bar"):
percentage = stats.get("percentage", 0.0) percentage = stats.get("percentage", 0.0)
@@ -837,25 +848,24 @@ class App:
limit = stats.get("limit", 0) limit = stats.get("limit", 0)
dpg.set_value("token_budget_label", f"{current:,} / {limit:,}") dpg.set_value("token_budget_label", f"{current:,} / {limit:,}")
# Update Gemini-specific cache stats (throttled with diagnostics) # Gemini cache - Use payload data to avoid blocking the main thread with network calls
if now - self._last_diag_update_time > 10.0:
self._last_diag_update_time = now
if dpg.does_item_exist("gemini_cache_label"): if dpg.does_item_exist("gemini_cache_label"):
if self.current_provider == "gemini": cache_stats = payload.get("cache_stats")
try: if cache_stats:
cache_stats = ai_client.get_gemini_cache_stats()
count = cache_stats.get("cache_count", 0) count = cache_stats.get("cache_count", 0)
size_bytes = cache_stats.get("total_size_bytes", 0) size_bytes = cache_stats.get("total_size_bytes", 0)
size_kb = size_bytes / 1024.0 size_kb = size_bytes / 1024.0
text = f"Gemini Caches: {count} ({size_kb:.1f} KB)" text = f"Gemini Caches: {count} ({size_kb:.1f} KB)"
dpg.set_value("gemini_cache_label", text) dpg.set_value("gemini_cache_label", text)
dpg.configure_item("gemini_cache_label", show=True) dpg.configure_item("gemini_cache_label", show=True)
except Exception as e: elif self.current_provider != "gemini":
# If the API call fails, just hide the label
dpg.configure_item("gemini_cache_label", show=False)
else:
dpg.configure_item("gemini_cache_label", show=False) dpg.configure_item("gemini_cache_label", show=False)
# Note: We don't hide it if no stats are in payload,
# to avoid flickering during tool/chunk events that don't include stats.
def _update_performance_diagnostics(self):
"""Updates performance diagnostics displays (throttled)."""
now = time.time()
# Update Diagnostics panel (throttled for smoothness) # Update Diagnostics panel (throttled for smoothness)
if now - self._last_perf_update_time > 0.5: if now - self._last_perf_update_time > 0.5:
@@ -2221,6 +2231,34 @@ class App:
dpg.add_line_series(list(range(100)), self.perf_history["cpu"], label="cpu usage", tag="perf_cpu_plot") dpg.add_line_series(list(range(100)), self.perf_history["cpu"], label="cpu usage", tag="perf_cpu_plot")
dpg.set_axis_limits("axis_cpu_y", 0, 100) dpg.set_axis_limits("axis_cpu_y", 0, 100)
def _process_pending_gui_tasks(self):
"""Processes tasks queued from background threads on the main thread."""
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
gui_tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in gui_tasks:
try:
action = task.get("action")
if action == "set_value":
item = task.get("item")
val = task.get("value")
if item and dpg.does_item_exist(item):
dpg.set_value(item, val)
elif action == "click":
item = task.get("item")
if item and dpg.does_item_exist(item):
cb = dpg.get_item_callback(item)
if cb:
cb()
elif action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}))
except Exception as e:
print(f"Error executing GUI hook task: {e}")
def run(self): def run(self):
dpg.create_context() dpg.create_context()
dpg.configure_app(docking=True, docking_space=True, init_file="dpg_layout.ini") dpg.configure_app(docking=True, docking_space=True, init_file="dpg_layout.ini")
@@ -2272,25 +2310,7 @@ class App:
# Process queued API GUI tasks # Process queued API GUI tasks
self.perf_monitor.start_component("GUI_Tasks") self.perf_monitor.start_component("GUI_Tasks")
with self._pending_gui_tasks_lock: self._process_pending_gui_tasks()
gui_tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in gui_tasks:
try:
action = task.get("action")
if action == "set_value":
item = task.get("item")
val = task.get("value")
if item and dpg.does_item_exist(item):
dpg.set_value(item, val)
elif action == "click":
item = task.get("item")
if item and dpg.does_item_exist(item):
cb = dpg.get_item_callback(item)
if cb:
cb()
except Exception as e:
print(f"Error executing GUI hook task: {e}")
self.perf_monitor.end_component("GUI_Tasks") self.perf_monitor.end_component("GUI_Tasks")
# Handle retro arcade blinking effect # Handle retro arcade blinking effect
@@ -2394,7 +2414,7 @@ class App:
self.perf_monitor.end_component("Comms") self.perf_monitor.end_component("Comms")
self.perf_monitor.start_component("Telemetry") self.perf_monitor.start_component("Telemetry")
self._update_telemetry_panel() self._update_performance_diagnostics()
self.perf_monitor.end_component("Telemetry") self.perf_monitor.end_component("Telemetry")
self.perf_monitor.end_frame() self.perf_monitor.end_frame()
+114
View File
@@ -0,0 +1,114 @@
import pytest
from unittest.mock import MagicMock
import ai_client
def test_ai_client_event_emitter_exists():
# This should fail initially because 'events' won't exist on ai_client
assert hasattr(ai_client, 'events')
assert ai_client.events is not None
def test_event_emission():
# We'll expect these event names based on the spec
mock_callback = MagicMock()
ai_client.events.on("request_start", mock_callback)
# Trigger something that should emit the event (once implemented)
# For now, we just test the emitter itself if we were to call it manually
ai_client.events.emit("request_start", payload={"model": "test"})
mock_callback.assert_called_once_with(payload={"model": "test"})
def test_send_emits_events():
from unittest.mock import patch, MagicMock
# We need to mock _ensure_gemini_client and the chat object it creates
with patch("ai_client._ensure_gemini_client"), \
patch("ai_client._gemini_client") as mock_client, \
patch("ai_client._gemini_chat") as mock_chat:
# Setup mock response
mock_response = MagicMock()
mock_response.candidates = []
# Explicitly set usage_metadata as a mock with integer values
mock_usage = MagicMock()
mock_usage.prompt_token_count = 10
mock_usage.candidates_token_count = 5
mock_usage.cached_content_token_count = None
mock_response.usage_metadata = mock_usage
mock_chat.send_message.return_value = mock_response
mock_client.chats.create.return_value = mock_chat
ai_client.set_provider("gemini", "gemini-flash")
start_callback = MagicMock()
response_callback = MagicMock()
ai_client.events.on("request_start", start_callback)
ai_client.events.on("response_received", response_callback)
# We need to bypass the context changed check or set it up
ai_client.send("context", "message")
assert start_callback.called
assert response_callback.called
# Check payload
args, kwargs = start_callback.call_args
assert kwargs['payload']['provider'] == 'gemini'
def test_send_emits_tool_events():
from unittest.mock import patch, MagicMock
with patch("ai_client._ensure_gemini_client"), \
patch("ai_client._gemini_client") as mock_client, \
patch("ai_client._gemini_chat") as mock_chat, \
patch("mcp_client.dispatch") as mock_dispatch:
# 1. Setup mock response with a tool call
mock_fc = MagicMock()
mock_fc.name = "read_file"
mock_fc.args = {"path": "test.txt"}
mock_response_with_tool = MagicMock()
mock_response_with_tool.candidates = [MagicMock()]
mock_part = MagicMock()
mock_part.text = "tool call text"
mock_part.function_call = mock_fc
mock_response_with_tool.candidates[0].content.parts = [mock_part]
mock_response_with_tool.candidates[0].finish_reason.name = "STOP"
# Setup mock usage
mock_usage = MagicMock()
mock_usage.prompt_token_count = 10
mock_usage.candidates_token_count = 5
mock_usage.cached_content_token_count = None
mock_response_with_tool.usage_metadata = mock_usage
# 2. Setup second mock response (final answer)
mock_response_final = MagicMock()
mock_response_final.candidates = []
mock_response_final.usage_metadata = mock_usage
mock_chat.send_message.side_effect = [mock_response_with_tool, mock_response_final]
mock_dispatch.return_value = "file content"
ai_client.set_provider("gemini", "gemini-flash")
tool_callback = MagicMock()
ai_client.events.on("tool_execution", tool_callback)
ai_client.send("context", "message")
# Should be called twice: once for 'started', once for 'completed'
assert tool_callback.call_count == 2
# Check 'started' call
args, kwargs = tool_callback.call_args_list[0]
assert kwargs['payload']['status'] == 'started'
assert kwargs['payload']['tool'] == 'read_file'
# Check 'completed' call
args, kwargs = tool_callback.call_args_list[1]
assert kwargs['payload']['status'] == 'completed'
assert kwargs['payload']['result'] == 'file content'
+1 -1
View File
@@ -53,7 +53,7 @@ def test_diagnostics_panel_updates(app_instance):
# We also need to mock ai_client stats # We also need to mock ai_client stats
with patch('ai_client.get_history_bleed_stats', return_value={}): with patch('ai_client.get_history_bleed_stats', return_value={}):
app_instance._update_telemetry_panel() app_instance._update_performance_diagnostics()
# Verify UI updates # Verify UI updates
mock_set_value.assert_any_call("perf_fps_text", "100.0") mock_set_value.assert_any_call("perf_fps_text", "100.0")
+62
View File
@@ -0,0 +1,62 @@
import pytest
from unittest.mock import MagicMock, patch
import dearpygui.dearpygui as dpg
import gui
from gui import App
import ai_client
@pytest.fixture
def app_instance():
"""
Fixture to create an instance of the App class for testing.
It creates a real DPG context but mocks functions that would
render a window or block execution.
"""
dpg.create_context()
with patch('dearpygui.dearpygui.create_viewport'), \
patch('dearpygui.dearpygui.setup_dearpygui'), \
patch('dearpygui.dearpygui.show_viewport'), \
patch('dearpygui.dearpygui.start_dearpygui'), \
patch('gui.load_config', return_value={}), \
patch('gui.PerformanceMonitor'), \
patch('gui.shell_runner'), \
patch('gui.project_manager'), \
patch.object(App, '_load_active_project'), \
patch.object(App, '_rebuild_files_list'), \
patch.object(App, '_rebuild_shots_list'), \
patch.object(App, '_rebuild_disc_list'), \
patch.object(App, '_rebuild_disc_roles_list'), \
patch.object(App, '_rebuild_discussion_selector'), \
patch.object(App, '_refresh_project_widgets'):
app = App()
yield app
dpg.destroy_context()
def test_gui_updates_on_event(app_instance):
# Patch dependencies for the test
with patch('dearpygui.dearpygui.set_value') as mock_set_value, \
patch('dearpygui.dearpygui.does_item_exist', return_value=True), \
patch('dearpygui.dearpygui.configure_item'), \
patch('ai_client.get_history_bleed_stats') as mock_stats:
mock_stats.return_value = {"percentage": 50.0, "current": 500, "limit": 1000}
# We'll use patch.object to see if _refresh_api_metrics is called
with patch.object(app_instance, '_refresh_api_metrics', wraps=app_instance._refresh_api_metrics) as mock_refresh:
# Simulate event
ai_client.events.emit("response_received", payload={})
# Process tasks manually
app_instance._process_pending_gui_tasks()
# Verify that _refresh_api_metrics was called
mock_refresh.assert_called_once()
# Verify that dpg.set_value was called for the metrics widgets
calls = [call.args[0] for call in mock_set_value.call_args_list]
assert "token_budget_bar" in calls
assert "token_budget_label" in calls
+6 -6
View File
@@ -41,7 +41,7 @@ def app_instance():
def test_telemetry_panel_updates_correctly(app_instance): def test_telemetry_panel_updates_correctly(app_instance):
""" """
Tests that the _update_telemetry_panel method correctly updates Tests that the _update_performance_diagnostics method correctly updates
DPG widgets based on the stats from ai_client. DPG widgets based on the stats from ai_client.
""" """
# 1. Set the provider to anthropic # 1. Set the provider to anthropic
@@ -64,7 +64,7 @@ def test_telemetry_panel_updates_correctly(app_instance):
patch('dearpygui.dearpygui.does_item_exist', return_value=True) as mock_does_item_exist: patch('dearpygui.dearpygui.does_item_exist', return_value=True) as mock_does_item_exist:
# 4. Call the method under test # 4. Call the method under test
app_instance._update_telemetry_panel() app_instance._refresh_api_metrics()
# 5. Assert the results # 5. Assert the results
mock_get_stats.assert_called_once() mock_get_stats.assert_called_once()
@@ -78,7 +78,7 @@ def test_telemetry_panel_updates_correctly(app_instance):
def test_cache_data_display_updates_correctly(app_instance): def test_cache_data_display_updates_correctly(app_instance):
""" """
Tests that the _update_telemetry_panel method correctly updates the Tests that the _update_performance_diagnostics method correctly updates the
GUI with Gemini cache statistics when the provider is set to Gemini. GUI with Gemini cache statistics when the provider is set to Gemini.
""" """
# 1. Set the provider to Gemini # 1. Set the provider to Gemini
@@ -103,11 +103,11 @@ def test_cache_data_display_updates_correctly(app_instance):
# We also need to mock get_history_bleed_stats as it's called in the same function # We also need to mock get_history_bleed_stats as it's called in the same function
with patch('ai_client.get_history_bleed_stats', return_value={}): with patch('ai_client.get_history_bleed_stats', return_value={}):
# 4. Call the method under test # 4. Call the method under test with payload
app_instance._update_telemetry_panel() app_instance._refresh_api_metrics(payload={'cache_stats': mock_cache_stats})
# 5. Assert the results # 5. Assert the results
mock_get_cache_stats.assert_called_once() # mock_get_cache_stats.assert_called_once() # No longer called synchronously
# Check that the UI item was shown and its value was set # Check that the UI item was shown and its value was set
mock_configure_item.assert_any_call("gemini_cache_label", show=True) mock_configure_item.assert_any_call("gemini_cache_label", show=True)