feat(mma): Phase 1 — source_tier tagging at emission

- ai_client: add current_tier module var; stamp source_tier on every _append_comms entry
- multi_agent_conductor: set current_tier='Tier 3' around send(), clear in finally
- conductor_tech_lead: set current_tier='Tier 2' around send(), clear in finally
- gui_2: _on_tool_log captures current_tier; _append_tool_log stores dict with source_tier
- tests: 8 new tests covering current_tier, source_tier in comms, tool log dict format
This commit is contained in:
2026-03-02 16:18:00 -05:00
parent 264b04f060
commit 8d9f25d0ce
5 changed files with 152 additions and 14 deletions

View File

@@ -89,6 +89,9 @@ comms_log_callback: Callable[[dict[str, Any]], None] | None = None
# Signature: (script: str, result: str) -> None
tool_log_callback: Callable[[str, str], None] | None = None
# Set by caller tiers before ai_client.send(); cleared in finally.
# Safe — ai_client.send() calls are serialized by the MMA engine executor.
current_tier: str | None = None
# Increased to allow thorough code exploration before forcing a summary
MAX_TOOL_ROUNDS: int = 10
@@ -134,12 +137,13 @@ COMMS_CLAMP_CHARS: int = 300
def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None:
entry = {
"ts": datetime.datetime.now().strftime("%H:%M:%S"),
"direction": direction,
"kind": kind,
"provider": _provider,
"model": _model,
"payload": payload,
"ts": datetime.datetime.now().strftime("%H:%M:%S"),
"direction": direction,
"kind": kind,
"provider": _provider,
"model": _model,
"payload": payload,
"source_tier": current_tier, # set/cleared by caller tiers; None for main-session calls
}
_comms_log.append(entry)
if comms_log_callback is not None:

View File

@@ -19,6 +19,7 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
# Set custom system prompt for this call
old_system_prompt = ai_client._custom_system_prompt
ai_client.set_custom_system_prompt(system_prompt)
ai_client.current_tier = "Tier 2"
try:
# 3. Call Tier 2 Model
response = ai_client.send(
@@ -41,11 +42,11 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
return tickets
except Exception as e:
print(f"Error parsing Tier 2 response: {e}")
# print(f"Raw response: {response}")
return []
finally:
# Restore old system prompt
# Restore old system prompt and clear tier tag
ai_client.set_custom_system_prompt(old_system_prompt)
ai_client.current_tier = None
from dag_engine import TrackDAG
from models import Ticket

View File

@@ -276,10 +276,10 @@ class App:
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
}
self._tool_log: list[tuple[str, str, float]] = []
self._tool_log: list[dict] = []
self._comms_log: list[dict[str, Any]] = []
self._pending_comms: list[dict[str, Any]] = []
self._pending_tool_calls: list[tuple[str, str, float]] = []
self._pending_tool_calls: list[dict] = []
self._pending_history_adds: list[dict[str, Any]] = []
self._trigger_blink = False
self._is_blinking = False
@@ -891,8 +891,9 @@ class App:
def _on_tool_log(self, script: str, result: str) -> None:
session_logger.log_tool_call(script, result, None)
source_tier = ai_client.current_tier
with self._pending_tool_calls_lock:
self._pending_tool_calls.append((script, result, time.time()))
self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
def _on_api_event(self, *args, **kwargs) -> None:
payload = kwargs.get("payload", {})
@@ -1488,8 +1489,8 @@ class App:
return True
return False
def _append_tool_log(self, script: str, result: str) -> None:
self._tool_log.append((script, result, time.time()))
def _append_tool_log(self, script: str, result: str, source_tier: str | None = None) -> None:
self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
self.ui_last_script_text = script
self.ui_last_script_output = result
self._trigger_script_blink = True

View File

@@ -308,6 +308,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
old_comms_cb(entry)
ai_client.comms_log_callback = worker_comms_callback
ai_client.current_tier = "Tier 3"
try:
comms_baseline = len(ai_client.get_comms_log())
response = ai_client.send(
@@ -320,7 +321,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
)
finally:
ai_client.comms_log_callback = old_comms_cb
ai_client.current_tier = None
if event_queue:
# Push via "response" event type — _process_event_queue wraps this
# as {"action": "handle_ai_response", "payload": ...} for the GUI.

View File

@@ -0,0 +1,131 @@
"""
Tests for mma_agent_focus_ux_20260302 — Phase 1: Tier Tagging at Emission.
These tests are written RED-first: they fail before implementation.
"""
from typing import Generator
import pytest
from unittest.mock import patch
import ai_client
from gui_2 import App
@pytest.fixture
def app_instance() -> Generator[App, None, None]:
with (
patch('gui_2.load_config', return_value={'gui': {'show_windows': {}}}),
patch('gui_2.save_config'),
patch('gui_2.project_manager'),
patch('gui_2.session_logger'),
patch('gui_2.immapp.run'),
patch.object(App, '_load_active_project'),
patch.object(App, '_fetch_models'),
patch.object(App, '_load_fonts'),
patch.object(App, '_post_init')
):
yield App()
@pytest.fixture(autouse=True)
def reset_tier():
"""Reset current_tier before and after each test."""
if hasattr(ai_client, "current_tier"):
ai_client.current_tier = None
yield
if hasattr(ai_client, "current_tier"):
ai_client.current_tier = None
# ---------------------------------------------------------------------------
# Task 1.1 / 1.2: current_tier variable and source_tier in _append_comms
# ---------------------------------------------------------------------------
def test_current_tier_variable_exists():
"""ai_client must expose a module-level current_tier variable."""
assert hasattr(ai_client, "current_tier"), (
"ai_client.current_tier does not exist — Task 1.1 not implemented"
)
def test_append_comms_has_source_tier_key():
"""_append_comms entries must contain a 'source_tier' key."""
ai_client.clear_comms_log()
ai_client._append_comms("OUT", "request", {"text": "hello"})
log = ai_client.get_comms_log()
assert len(log) >= 1, "comms log is empty after _append_comms"
last_entry = log[-1]
assert "source_tier" in last_entry, (
f"'source_tier' key missing from comms entry: {last_entry}"
)
def test_append_comms_source_tier_none_when_unset():
"""source_tier must be None when current_tier is not set."""
ai_client.clear_comms_log()
ai_client.current_tier = None
ai_client._append_comms("OUT", "request", {"text": "hello"})
log = ai_client.get_comms_log()
last_entry = log[-1]
assert last_entry["source_tier"] is None, (
f"Expected source_tier=None, got {last_entry['source_tier']}"
)
def test_append_comms_source_tier_set_when_current_tier_set():
"""source_tier must reflect current_tier when it is set."""
ai_client.clear_comms_log()
ai_client.current_tier = "Tier 3"
ai_client._append_comms("OUT", "request", {"text": "hello"})
log = ai_client.get_comms_log()
last_entry = log[-1]
assert last_entry["source_tier"] == "Tier 3", (
f"Expected source_tier='Tier 3', got {last_entry['source_tier']}"
)
def test_append_comms_source_tier_tier2():
"""source_tier must reflect Tier 2 when current_tier = 'Tier 2'."""
ai_client.clear_comms_log()
ai_client.current_tier = "Tier 2"
ai_client._append_comms("IN", "response", {"text": "result"})
log = ai_client.get_comms_log()
last_entry = log[-1]
assert last_entry["source_tier"] == "Tier 2", (
f"Expected source_tier='Tier 2', got {last_entry['source_tier']}"
)
# ---------------------------------------------------------------------------
# Task 1.5: _tool_log stores dicts with source_tier
# ---------------------------------------------------------------------------
def test_append_tool_log_stores_dict(app_instance):
"""_append_tool_log must store a dict, not a tuple."""
app = app_instance
initial_len = len(app._tool_log)
app._append_tool_log("echo hello", "output", "Tier 3")
assert len(app._tool_log) == initial_len + 1, "_tool_log length did not increase"
entry = app._tool_log[-1]
assert isinstance(entry, dict), (
f"_tool_log entry is a {type(entry).__name__}, expected dict"
)
def test_append_tool_log_dict_has_source_tier(app_instance):
"""Dict entry must have 'source_tier' key."""
app = app_instance
app._append_tool_log("ls", "file1\nfile2", "Tier 3")
entry = app._tool_log[-1]
assert "source_tier" in entry, f"'source_tier' missing from tool log dict: {entry}"
assert entry["source_tier"] == "Tier 3"
def test_append_tool_log_dict_keys(app_instance):
"""Dict entry must have script, result, ts, source_tier keys."""
app = app_instance
app._append_tool_log("pwd", "/projects", None)
entry = app._tool_log[-1]
for key in ("script", "result", "ts", "source_tier"):
assert key in entry, f"key '{key}' missing from tool log entry: {entry}"
assert entry["script"] == "pwd"
assert entry["result"] == "/projects"
assert entry["source_tier"] is None