fix(conductor): Apply review suggestions for track 'event_driven_metrics_20260223'
This commit is contained in:
10
ai_client.py
10
ai_client.py
@@ -632,7 +632,15 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
|
|||||||
if cached_tokens:
|
if cached_tokens:
|
||||||
usage["cache_read_input_tokens"] = cached_tokens
|
usage["cache_read_input_tokens"] = cached_tokens
|
||||||
|
|
||||||
events.emit("response_received", payload={"provider": "gemini", "model": _model, "usage": usage, "round": r_idx})
|
# Fetch cache stats in the background thread to avoid blocking GUI
|
||||||
|
cache_stats = None
|
||||||
|
try:
|
||||||
|
cache_stats = get_gemini_cache_stats()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
events.emit("response_received", payload={"provider": "gemini", "model": _model, "usage": usage, "round": r_idx, "cache_stats": cache_stats})
|
||||||
|
|
||||||
reason = resp.candidates[0].finish_reason.name if resp.candidates and hasattr(resp.candidates[0], "finish_reason") else "STOP"
|
reason = resp.candidates[0].finish_reason.name if resp.candidates and hasattr(resp.candidates[0], "finish_reason") else "STOP"
|
||||||
|
|
||||||
_append_comms("IN", "response", {"round": r_idx, "stop_reason": reason, "text": txt, "tool_calls": [{"name": c.name, "args": dict(c.args)} for c in calls], "usage": usage})
|
_append_comms("IN", "response", {"round": r_idx, "stop_reason": reason, "text": txt, "tool_calls": [{"name": c.name, "args": dict(c.args)} for c in calls], "usage": usage})
|
||||||
|
|||||||
29
events.py
29
events.py
@@ -1,14 +1,37 @@
|
|||||||
|
"""
|
||||||
|
Decoupled event emission system for cross-module communication.
|
||||||
|
"""
|
||||||
|
from typing import Callable, Any, Dict, List
|
||||||
|
|
||||||
class EventEmitter:
|
class EventEmitter:
|
||||||
|
"""
|
||||||
|
Simple event emitter for decoupled communication between modules.
|
||||||
|
"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._listeners = {}
|
"""Initializes the EventEmitter with an empty listener map."""
|
||||||
|
self._listeners: Dict[str, List[Callable]] = {}
|
||||||
|
|
||||||
def on(self, event_name, callback):
|
def on(self, event_name: str, callback: Callable):
|
||||||
|
"""
|
||||||
|
Registers a callback for a specific event.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_name: The name of the event to listen for.
|
||||||
|
callback: The function to call when the event is emitted.
|
||||||
|
"""
|
||||||
if event_name not in self._listeners:
|
if event_name not in self._listeners:
|
||||||
self._listeners[event_name] = []
|
self._listeners[event_name] = []
|
||||||
self._listeners[event_name].append(callback)
|
self._listeners[event_name].append(callback)
|
||||||
|
|
||||||
def emit(self, event_name, *args, **kwargs):
|
def emit(self, event_name: str, *args: Any, **kwargs: Any):
|
||||||
|
"""
|
||||||
|
Emits an event, calling all registered callbacks.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_name: The name of the event to emit.
|
||||||
|
*args: Positional arguments to pass to callbacks.
|
||||||
|
**kwargs: Keyword arguments to pass to callbacks.
|
||||||
|
"""
|
||||||
if event_name in self._listeners:
|
if event_name in self._listeners:
|
||||||
for callback in self._listeners[event_name]:
|
for callback in self._listeners[event_name]:
|
||||||
callback(*args, **kwargs)
|
callback(*args, **kwargs)
|
||||||
|
|||||||
21
gui.py
21
gui.py
@@ -829,11 +829,13 @@ class App:
|
|||||||
|
|
||||||
def _on_api_event(self, *args, **kwargs):
|
def _on_api_event(self, *args, **kwargs):
|
||||||
"""Callback for ai_client events. Queues a telemetry refresh on the main thread."""
|
"""Callback for ai_client events. Queues a telemetry refresh on the main thread."""
|
||||||
|
payload = kwargs.get("payload", {})
|
||||||
with self._pending_gui_tasks_lock:
|
with self._pending_gui_tasks_lock:
|
||||||
self._pending_gui_tasks.append({"action": "refresh_api_metrics"})
|
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
|
||||||
|
|
||||||
def _refresh_api_metrics(self):
|
def _refresh_api_metrics(self, payload: dict = None):
|
||||||
"""Updates the token budget and cache stats visualizers."""
|
"""Updates the token budget and cache stats visualizers."""
|
||||||
|
payload = payload or {}
|
||||||
self._last_bleed_update_time = time.time()
|
self._last_bleed_update_time = time.time()
|
||||||
|
|
||||||
# History bleed
|
# History bleed
|
||||||
@@ -846,21 +848,20 @@ class App:
|
|||||||
limit = stats.get("limit", 0)
|
limit = stats.get("limit", 0)
|
||||||
dpg.set_value("token_budget_label", f"{current:,} / {limit:,}")
|
dpg.set_value("token_budget_label", f"{current:,} / {limit:,}")
|
||||||
|
|
||||||
# Gemini cache
|
# Gemini cache - Use payload data to avoid blocking the main thread with network calls
|
||||||
if dpg.does_item_exist("gemini_cache_label"):
|
if dpg.does_item_exist("gemini_cache_label"):
|
||||||
if self.current_provider == "gemini":
|
cache_stats = payload.get("cache_stats")
|
||||||
try:
|
if cache_stats:
|
||||||
cache_stats = ai_client.get_gemini_cache_stats()
|
|
||||||
count = cache_stats.get("cache_count", 0)
|
count = cache_stats.get("cache_count", 0)
|
||||||
size_bytes = cache_stats.get("total_size_bytes", 0)
|
size_bytes = cache_stats.get("total_size_bytes", 0)
|
||||||
size_kb = size_bytes / 1024.0
|
size_kb = size_bytes / 1024.0
|
||||||
text = f"Gemini Caches: {count} ({size_kb:.1f} KB)"
|
text = f"Gemini Caches: {count} ({size_kb:.1f} KB)"
|
||||||
dpg.set_value("gemini_cache_label", text)
|
dpg.set_value("gemini_cache_label", text)
|
||||||
dpg.configure_item("gemini_cache_label", show=True)
|
dpg.configure_item("gemini_cache_label", show=True)
|
||||||
except Exception:
|
elif self.current_provider != "gemini":
|
||||||
dpg.configure_item("gemini_cache_label", show=False)
|
|
||||||
else:
|
|
||||||
dpg.configure_item("gemini_cache_label", show=False)
|
dpg.configure_item("gemini_cache_label", show=False)
|
||||||
|
# Note: We don't hide it if no stats are in payload,
|
||||||
|
# to avoid flickering during tool/chunk events that don't include stats.
|
||||||
|
|
||||||
def _update_performance_diagnostics(self):
|
def _update_performance_diagnostics(self):
|
||||||
"""Updates performance diagnostics displays (throttled)."""
|
"""Updates performance diagnostics displays (throttled)."""
|
||||||
@@ -2254,7 +2255,7 @@ class App:
|
|||||||
if cb:
|
if cb:
|
||||||
cb()
|
cb()
|
||||||
elif action == "refresh_api_metrics":
|
elif action == "refresh_api_metrics":
|
||||||
self._refresh_api_metrics()
|
self._refresh_api_metrics(task.get("payload", {}))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error executing GUI hook task: {e}")
|
print(f"Error executing GUI hook task: {e}")
|
||||||
|
|
||||||
|
|||||||
@@ -103,11 +103,11 @@ def test_cache_data_display_updates_correctly(app_instance):
|
|||||||
# We also need to mock get_history_bleed_stats as it's called in the same function
|
# We also need to mock get_history_bleed_stats as it's called in the same function
|
||||||
with patch('ai_client.get_history_bleed_stats', return_value={}):
|
with patch('ai_client.get_history_bleed_stats', return_value={}):
|
||||||
|
|
||||||
# 4. Call the method under test
|
# 4. Call the method under test with payload
|
||||||
app_instance._refresh_api_metrics()
|
app_instance._refresh_api_metrics(payload={'cache_stats': mock_cache_stats})
|
||||||
|
|
||||||
# 5. Assert the results
|
# 5. Assert the results
|
||||||
mock_get_cache_stats.assert_called_once()
|
# mock_get_cache_stats.assert_called_once() # No longer called synchronously
|
||||||
|
|
||||||
# Check that the UI item was shown and its value was set
|
# Check that the UI item was shown and its value was set
|
||||||
mock_configure_item.assert_any_call("gemini_cache_label", show=True)
|
mock_configure_item.assert_any_call("gemini_cache_label", show=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user