feat(mma): Phase 1 — source_tier tagging at emission
- ai_client: add current_tier module var; stamp source_tier on every _append_comms entry - multi_agent_conductor: set current_tier='Tier 3' around send(), clear in finally - conductor_tech_lead: set current_tier='Tier 2' around send(), clear in finally - gui_2: _on_tool_log captures current_tier; _append_tool_log stores dict with source_tier - tests: 8 new tests covering current_tier, source_tier in comms, tool log dict format
This commit is contained in:
16
ai_client.py
16
ai_client.py
@@ -89,6 +89,9 @@ comms_log_callback: Callable[[dict[str, Any]], None] | None = None
|
||||
# Signature: (script: str, result: str) -> None
|
||||
tool_log_callback: Callable[[str, str], None] | None = None
|
||||
|
||||
# Set by caller tiers before ai_client.send(); cleared in finally.
|
||||
# Safe — ai_client.send() calls are serialized by the MMA engine executor.
|
||||
current_tier: str | None = None
|
||||
# Increased to allow thorough code exploration before forcing a summary
|
||||
MAX_TOOL_ROUNDS: int = 10
|
||||
|
||||
@@ -134,12 +137,13 @@ COMMS_CLAMP_CHARS: int = 300
|
||||
|
||||
def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None:
|
||||
entry = {
|
||||
"ts": datetime.datetime.now().strftime("%H:%M:%S"),
|
||||
"direction": direction,
|
||||
"kind": kind,
|
||||
"provider": _provider,
|
||||
"model": _model,
|
||||
"payload": payload,
|
||||
"ts": datetime.datetime.now().strftime("%H:%M:%S"),
|
||||
"direction": direction,
|
||||
"kind": kind,
|
||||
"provider": _provider,
|
||||
"model": _model,
|
||||
"payload": payload,
|
||||
"source_tier": current_tier, # set/cleared by caller tiers; None for main-session calls
|
||||
}
|
||||
_comms_log.append(entry)
|
||||
if comms_log_callback is not None:
|
||||
|
||||
@@ -19,6 +19,7 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
|
||||
# Set custom system prompt for this call
|
||||
old_system_prompt = ai_client._custom_system_prompt
|
||||
ai_client.set_custom_system_prompt(system_prompt)
|
||||
ai_client.current_tier = "Tier 2"
|
||||
try:
|
||||
# 3. Call Tier 2 Model
|
||||
response = ai_client.send(
|
||||
@@ -41,11 +42,11 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
|
||||
return tickets
|
||||
except Exception as e:
|
||||
print(f"Error parsing Tier 2 response: {e}")
|
||||
# print(f"Raw response: {response}")
|
||||
return []
|
||||
finally:
|
||||
# Restore old system prompt
|
||||
# Restore old system prompt and clear tier tag
|
||||
ai_client.set_custom_system_prompt(old_system_prompt)
|
||||
ai_client.current_tier = None
|
||||
|
||||
from dag_engine import TrackDAG
|
||||
from models import Ticket
|
||||
|
||||
11
gui_2.py
11
gui_2.py
@@ -276,10 +276,10 @@ class App:
|
||||
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
|
||||
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
|
||||
}
|
||||
self._tool_log: list[tuple[str, str, float]] = []
|
||||
self._tool_log: list[dict] = []
|
||||
self._comms_log: list[dict[str, Any]] = []
|
||||
self._pending_comms: list[dict[str, Any]] = []
|
||||
self._pending_tool_calls: list[tuple[str, str, float]] = []
|
||||
self._pending_tool_calls: list[dict] = []
|
||||
self._pending_history_adds: list[dict[str, Any]] = []
|
||||
self._trigger_blink = False
|
||||
self._is_blinking = False
|
||||
@@ -891,8 +891,9 @@ class App:
|
||||
|
||||
def _on_tool_log(self, script: str, result: str) -> None:
|
||||
session_logger.log_tool_call(script, result, None)
|
||||
source_tier = ai_client.current_tier
|
||||
with self._pending_tool_calls_lock:
|
||||
self._pending_tool_calls.append((script, result, time.time()))
|
||||
self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
|
||||
|
||||
def _on_api_event(self, *args, **kwargs) -> None:
|
||||
payload = kwargs.get("payload", {})
|
||||
@@ -1488,8 +1489,8 @@ class App:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _append_tool_log(self, script: str, result: str) -> None:
|
||||
self._tool_log.append((script, result, time.time()))
|
||||
def _append_tool_log(self, script: str, result: str, source_tier: str | None = None) -> None:
|
||||
self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
|
||||
self.ui_last_script_text = script
|
||||
self.ui_last_script_output = result
|
||||
self._trigger_script_blink = True
|
||||
|
||||
@@ -308,6 +308,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
|
||||
old_comms_cb(entry)
|
||||
|
||||
ai_client.comms_log_callback = worker_comms_callback
|
||||
ai_client.current_tier = "Tier 3"
|
||||
try:
|
||||
comms_baseline = len(ai_client.get_comms_log())
|
||||
response = ai_client.send(
|
||||
@@ -320,7 +321,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
|
||||
)
|
||||
finally:
|
||||
ai_client.comms_log_callback = old_comms_cb
|
||||
|
||||
ai_client.current_tier = None
|
||||
if event_queue:
|
||||
# Push via "response" event type — _process_event_queue wraps this
|
||||
# as {"action": "handle_ai_response", "payload": ...} for the GUI.
|
||||
|
||||
131
tests/test_mma_agent_focus_phase1.py
Normal file
131
tests/test_mma_agent_focus_phase1.py
Normal file
@@ -0,0 +1,131 @@
|
||||
"""
|
||||
Tests for mma_agent_focus_ux_20260302 — Phase 1: Tier Tagging at Emission.
|
||||
These tests are written RED-first: they fail before implementation.
|
||||
"""
|
||||
from typing import Generator
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
import ai_client
|
||||
from gui_2 import App
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app_instance() -> Generator[App, None, None]:
|
||||
with (
|
||||
patch('gui_2.load_config', return_value={'gui': {'show_windows': {}}}),
|
||||
patch('gui_2.save_config'),
|
||||
patch('gui_2.project_manager'),
|
||||
patch('gui_2.session_logger'),
|
||||
patch('gui_2.immapp.run'),
|
||||
patch.object(App, '_load_active_project'),
|
||||
patch.object(App, '_fetch_models'),
|
||||
patch.object(App, '_load_fonts'),
|
||||
patch.object(App, '_post_init')
|
||||
):
|
||||
yield App()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_tier():
|
||||
"""Reset current_tier before and after each test."""
|
||||
if hasattr(ai_client, "current_tier"):
|
||||
ai_client.current_tier = None
|
||||
yield
|
||||
if hasattr(ai_client, "current_tier"):
|
||||
ai_client.current_tier = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Task 1.1 / 1.2: current_tier variable and source_tier in _append_comms
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_current_tier_variable_exists():
|
||||
"""ai_client must expose a module-level current_tier variable."""
|
||||
assert hasattr(ai_client, "current_tier"), (
|
||||
"ai_client.current_tier does not exist — Task 1.1 not implemented"
|
||||
)
|
||||
|
||||
|
||||
def test_append_comms_has_source_tier_key():
|
||||
"""_append_comms entries must contain a 'source_tier' key."""
|
||||
ai_client.clear_comms_log()
|
||||
ai_client._append_comms("OUT", "request", {"text": "hello"})
|
||||
log = ai_client.get_comms_log()
|
||||
assert len(log) >= 1, "comms log is empty after _append_comms"
|
||||
last_entry = log[-1]
|
||||
assert "source_tier" in last_entry, (
|
||||
f"'source_tier' key missing from comms entry: {last_entry}"
|
||||
)
|
||||
|
||||
|
||||
def test_append_comms_source_tier_none_when_unset():
|
||||
"""source_tier must be None when current_tier is not set."""
|
||||
ai_client.clear_comms_log()
|
||||
ai_client.current_tier = None
|
||||
ai_client._append_comms("OUT", "request", {"text": "hello"})
|
||||
log = ai_client.get_comms_log()
|
||||
last_entry = log[-1]
|
||||
assert last_entry["source_tier"] is None, (
|
||||
f"Expected source_tier=None, got {last_entry['source_tier']}"
|
||||
)
|
||||
|
||||
|
||||
def test_append_comms_source_tier_set_when_current_tier_set():
|
||||
"""source_tier must reflect current_tier when it is set."""
|
||||
ai_client.clear_comms_log()
|
||||
ai_client.current_tier = "Tier 3"
|
||||
ai_client._append_comms("OUT", "request", {"text": "hello"})
|
||||
log = ai_client.get_comms_log()
|
||||
last_entry = log[-1]
|
||||
assert last_entry["source_tier"] == "Tier 3", (
|
||||
f"Expected source_tier='Tier 3', got {last_entry['source_tier']}"
|
||||
)
|
||||
|
||||
|
||||
def test_append_comms_source_tier_tier2():
|
||||
"""source_tier must reflect Tier 2 when current_tier = 'Tier 2'."""
|
||||
ai_client.clear_comms_log()
|
||||
ai_client.current_tier = "Tier 2"
|
||||
ai_client._append_comms("IN", "response", {"text": "result"})
|
||||
log = ai_client.get_comms_log()
|
||||
last_entry = log[-1]
|
||||
assert last_entry["source_tier"] == "Tier 2", (
|
||||
f"Expected source_tier='Tier 2', got {last_entry['source_tier']}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Task 1.5: _tool_log stores dicts with source_tier
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_append_tool_log_stores_dict(app_instance):
|
||||
"""_append_tool_log must store a dict, not a tuple."""
|
||||
app = app_instance
|
||||
initial_len = len(app._tool_log)
|
||||
app._append_tool_log("echo hello", "output", "Tier 3")
|
||||
assert len(app._tool_log) == initial_len + 1, "_tool_log length did not increase"
|
||||
entry = app._tool_log[-1]
|
||||
assert isinstance(entry, dict), (
|
||||
f"_tool_log entry is a {type(entry).__name__}, expected dict"
|
||||
)
|
||||
|
||||
|
||||
def test_append_tool_log_dict_has_source_tier(app_instance):
|
||||
"""Dict entry must have 'source_tier' key."""
|
||||
app = app_instance
|
||||
app._append_tool_log("ls", "file1\nfile2", "Tier 3")
|
||||
entry = app._tool_log[-1]
|
||||
assert "source_tier" in entry, f"'source_tier' missing from tool log dict: {entry}"
|
||||
assert entry["source_tier"] == "Tier 3"
|
||||
|
||||
|
||||
def test_append_tool_log_dict_keys(app_instance):
|
||||
"""Dict entry must have script, result, ts, source_tier keys."""
|
||||
app = app_instance
|
||||
app._append_tool_log("pwd", "/projects", None)
|
||||
entry = app._tool_log[-1]
|
||||
for key in ("script", "result", "ts", "source_tier"):
|
||||
assert key in entry, f"key '{key}' missing from tool log entry: {entry}"
|
||||
assert entry["script"] == "pwd"
|
||||
assert entry["result"] == "/projects"
|
||||
assert entry["source_tier"] is None
|
||||
Reference in New Issue
Block a user