Compare commits

...

5 Commits

15 changed files with 296 additions and 318 deletions

View File

@@ -261,7 +261,7 @@ def set_provider(provider: str, model: str) -> None:
if provider == "gemini_cli":
valid_models = _list_gemini_cli_models()
# If model is invalid or belongs to another provider (like deepseek), force default
if model not in valid_models or model.startswith("deepseek"):
if model != "mock" and (model not in valid_models or model.startswith("deepseek")):
_model = "gemini-3-flash-preview"
else:
_model = model
@@ -815,8 +815,8 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
global _gemini_cli_adapter
try:
if _gemini_cli_adapter is None:
_gemini_cli_adapter = GeminiCliAdapter(binary_path="gemini")
adapter = _gemini_cli_adapter
_gemini_cli_adapter = GeminiCliAdapter(binary_path="gemini")
adapter = _gemini_cli_adapter
mcp_client.configure(file_items or [], [base_dir])
# Construct the system instruction, combining the base system prompt and the current context.
sys_instr = f"{_get_combined_system_prompt()}\n\n<context>\n{md_content}\n</context>"
@@ -1621,16 +1621,16 @@ from typing import Any, Callable, Optional, List
# and the _send_xxx functions are also defined at module level.
def send(
md_content: str,
user_message: str,
base_dir: str = ".",
file_items: list[dict[str, Any]] | None = None,
discussion_history: str = "",
stream: bool = False,
pre_tool_callback: Optional[Callable[[str], bool]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
md_content: str,
user_message: str,
base_dir: str = ".",
file_items: list[dict[str, Any]] | None = None,
discussion_history: str = "",
stream: bool = False,
pre_tool_callback: Optional[Callable[[str], bool]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
) -> str:
"""
"""
Send a message to the active provider.
md_content : aggregated markdown string (for Gemini: stable content only,
@@ -1645,73 +1645,16 @@ def send(
pre_tool_callback : Optional callback (payload: str) -> bool called before tool execution
qa_callback : Optional callback (stderr: str) -> str called for Tier 4 error analysis
"""
# --- START MOCK LOGIC ---
# Assuming _model, _custom_system_prompt, and _system_prompt are module-level variables.
# If _model is not 'mock', proceed to original provider logic.
if _model == 'mock':
mock_response_content = None
# Use _custom_system_prompt for keyword detection
current_system_prompt = _custom_system_prompt # Assuming _custom_system_prompt is accessible and defined
if 'tier1_epic_init' in current_system_prompt:
mock_response_content = [
{
"id": "mock-track-1",
"type": "epic",
"module": "conductor",
"persona": "Tier 1 Orchestrator",
"severity": "high",
"goal": "Initialize a new track.",
"acceptance_criteria": "Track created successfully with required fields."
},
{
"id": "mock-track-2",
"type": "epic",
"module": "conductor",
"persona": "Tier 1 Orchestrator",
"severity": "medium",
"goal": "Initialize another track.",
"acceptance_criteria": "Second track created successfully."
}
]
elif 'tier2_sprint_planning' in current_system_prompt:
mock_response_content = [
{
"id": "mock-ticket-1",
"type": "story",
"goal": "Implement feature X.",
"target_file": "src/feature_x.py",
"depends_on": [],
"context_requirements": ["requirements.txt", "main.py"]
},
{
"id": "mock-ticket-2",
"type": "bug",
"goal": "Fix bug Y.",
"target_file": "src/bug_y.py",
"depends_on": ["mock-ticket-1"],
"context_requirements": ["tests/test_bug_y.py"]
}
]
else:
mock_response_content = "Mock AI Response"
# The function is typed to return 'str', so we return a JSON string.
# Ensure 'json' is imported at the module level.
return json.dumps(mock_response_content)
# --- END MOCK LOGIC ---
with _send_lock:
if _provider == "gemini":
return _send_gemini(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)
elif _provider == "gemini_cli":
return _send_gemini_cli(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)
elif _provider == "anthropic":
return _send_anthropic(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)
elif _provider == "deepseek":
return _send_deepseek(md_content, user_message, base_dir, file_items, discussion_history, stream=stream, pre_tool_callback=pre_tool_callback, qa_callback=qa_callback)
raise ValueError(f"unknown provider: {_provider}")
with _send_lock:
if _provider == "gemini":
return _send_gemini(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)
elif _provider == "gemini_cli":
return _send_gemini_cli(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)
elif _provider == "anthropic":
return _send_anthropic(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)
elif _provider == "deepseek":
return _send_deepseek(md_content, user_message, base_dir, file_items, discussion_history, stream=stream, pre_tool_callback=pre_tool_callback, qa_callback=qa_callback)
raise ValueError(f"unknown provider: {_provider}")
def get_history_bleed_stats(md_content: str | None = None) -> dict[str, Any]:
"""
Calculates how close the current conversation history is to the token limit.

View File

@@ -125,13 +125,16 @@ class HookHandler(BaseHTTPRequestHandler):
result["mma_status"] = getattr(app, "mma_status", "idle")
result["ai_status"] = getattr(app, "ai_status", "idle")
result["active_tier"] = getattr(app, "active_tier", None)
result["active_track"] = getattr(app, "active_track", None)
at = getattr(app, "active_track", None)
result["active_track"] = at.id if hasattr(at, "id") else at
result["active_tickets"] = getattr(app, "active_tickets", [])
result["mma_step_mode"] = getattr(app, "mma_step_mode", False)
result["pending_approval"] = app._pending_mma_approval is not None
result["pending_approval"] = (getattr(app, "_pending_mma_approval", None) is not None) or getattr(app, "_pending_ask_dialog", False)
result["pending_spawn"] = getattr(app, "_pending_mma_spawn", None) is not None
# Added lines for tracks and proposed_tracks
result["tracks"] = getattr(app, "tracks", [])
result["proposed_tracks"] = getattr(app, "proposed_tracks", [])
result["mma_streams"] = getattr(app, "mma_streams", {})
finally:
event.set()
with app._pending_gui_tasks_lock:

View File

@@ -6,13 +6,13 @@
## Phase 2: Epic & Track Verification
- [x] Task: Write the simulation routine to trigger a new Epic and verify the Track Browser updates correctly. 605dfc3
- [~] Task: Verify that selecting a newly generated track successfully loads its initial (empty) state into the DAG visualizer.
- [x] Task: Verify that selecting a newly generated track successfully loads its initial (empty) state into the DAG visualizer.
## Phase 3: DAG & Spawn Interception Verification
- [ ] Task: Simulate the "Start Track" action and verify the DAG visualizer populates with tasks.
- [ ] Task: Simulate the Auto-Queue advancing to a "Ready" task.
- [ ] Task: Verify the "Approve Worker Spawn" modal appears with the correct prompt and context.
- [ ] Task: Simulate clicking "Approve" and verify the worker's simulated output streams into the correct task detail view.
- [x] Task: Simulate the "Start Track" action and verify the DAG visualizer populates with tasks.
- [x] Task: Simulate the Auto-Queue advancing to a "Ready" task.
- [x] Task: Verify the "Approve Worker Spawn" modal appears with the correct prompt and context.
- [x] Task: Simulate clicking "Approve" and verify the worker's simulated output streams into the correct task detail view.
## Phase: Review Fixes
- [ ] Task: Apply review suggestions 605dfc3

View File

@@ -31,4 +31,15 @@ This is a multi-track phase. To ensure architectural integrity, these tracks **M
3. **MMA Dashboard Visualization Overhaul:** (Builds the UI to visualize the state and subsets)
4. **[CURRENT] Robust Live Simulation Verification:** (Builds the tests to verify the UI and state)
**Prerequisites for this track:** `MMA Dashboard Visualization Overhaul` MUST be completed (`[x]`) before starting this track.
**Prerequisites for this track:** `MMA Dashboard Visualization Overhaul` MUST be completed (`[x]`) before starting this track.
## Session Compression (2026-02-28)
**Current State & Glaring Issues:**
1. **Brittle Interception System:** The visual simulation (`tests/visual_sim_mma_v2.py`) relies heavily on polling an `api_hooks.py` endpoint (`/api/gui/mma_status`) that aggregates several boolean flags (`pending_approval`, `pending_spawn`). This has proven extremely brittle. For example, `mock_gemini_cli.py` defaults to emitting a `read_file` tool call, which triggers the *general* tool approval popup (`_pending_ask`), freezing the test because it was expecting the *MMA spawn* popup (`_pending_mma_spawn`) or the *Track Proposal* modal.
2. **Mock Pollution in App Domain:** Previous attempts to fix the simulation shoehorned test-specific mock JSON responses directly into `ai_client.py` and `scripts/mma_exec.py`. This conflates the test environment with the production application codebase.
3. **Popup Handling Failures:** The GUI's state machine for closing popups (like `_show_track_proposal_modal` in `_cb_accept_tracks`) is desynchronized from the hook API. The test clicks "Accept", the tracks generate, but the UI state doesn't cleanly reset, leading to endless timeouts during test runs.
**Next Steps for the Handoff:**
- Completely rip out the hardcoded mock JSON arrays from `ai_client.py` and `scripts/mma_exec.py`.
- Refactor `tests/mock_gemini_cli.py` to be a pure, standalone mock that perfectly simulates the expected streaming behavior of `gemini_cli` without relying on the app to intercept specific magic prompts.
- Stabilize the hook API (`api_hooks.py`) so the test script can unambiguously distinguish between a general tool approval, an MMA step approval, and an MMA worker spawn approval, instead of relying on a fragile `pending_approval` catch-all.

View File

@@ -1,50 +0,0 @@
discussion = []
[metadata]
id = "track_51dabc55"
name = "Implement a robust mathematical engine for basic a"
status = "todo"
created_at = "2026-02-28T21:06:22.065199"
updated_at = "2026-02-28T21:06:22.065199"
[[tasks]]
id = "math_engine_add"
description = "Implement the addition operation for the mathematical engine."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = []
step_mode = false
[[tasks]]
id = "math_engine_subtract"
description = "Implement the subtraction operation for the mathematical engine."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"math_engine_add",
]
step_mode = false
[[tasks]]
id = "math_engine_multiply"
description = "Implement the multiplication operation for the mathematical engine."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"math_engine_subtract",
]
step_mode = false
[[tasks]]
id = "math_engine_divide"
description = "Implement the division operation for the mathematical engine, including handling division by zero."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"math_engine_multiply",
]
step_mode = false

View File

@@ -1,75 +0,0 @@
discussion = []
[metadata]
id = "track_d01fdb6e"
name = "Implement a robust, testable arithmetic engine for"
status = "todo"
created_at = "2026-02-28T21:00:16.295678"
updated_at = "2026-02-28T21:00:16.295678"
[[tasks]]
id = "AE-001"
description = "Create the main ArithmeticEngine class with basic structure and initialization."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = []
step_mode = false
[[tasks]]
id = "AE-002"
description = "Implement the 'add' method in the ArithmeticEngine class."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-001",
]
step_mode = false
[[tasks]]
id = "AE-003"
description = "Implement the 'subtract' method in the ArithmeticEngine class."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-001",
]
step_mode = false
[[tasks]]
id = "AE-004"
description = "Implement the 'multiply' method in the ArithmeticEngine class."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-001",
]
step_mode = false
[[tasks]]
id = "AE-005"
description = "Implement the 'divide' method in the ArithmeticEngine class, including division by zero handling."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-001",
]
step_mode = false
[[tasks]]
id = "AE-006"
description = "Add comprehensive unit tests for all arithmetic operations."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-002",
"AE-003",
"AE-004",
"AE-005",
]
step_mode = false

View File

@@ -9,9 +9,7 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
Breaks down a Track Brief and module skeletons into discrete Tier 3 Tickets.
"""
# 1. Set Tier 2 Model (Tech Lead - Flash)
if ai_client._model != 'mock':
ai_client.set_provider('gemini', 'gemini-2.5-flash-lite')
ai_client.reset_session() # 2. Construct Prompt
# 2. Construct Prompt
system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning")
user_message = (
f"### TRACK BRIEF:\n{track_brief}\n\n"

View File

@@ -1,6 +1,6 @@
[ai]
provider = "gemini_cli"
model = "mock"
model = "gemini-3-flash-preview"
temperature = 0.0
max_tokens = 8192
history_trunc_limit = 8000

View File

@@ -387,6 +387,8 @@ class App:
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
'btn_approve_tool': self._handle_approve_tool,
'btn_approve_spawn': self._handle_approve_spawn,
}
self._predefined_callbacks: dict[str, Callable[..., Any]] = {
'_test_callback_func_write_to_file': self._test_callback_func_write_to_file
@@ -886,6 +888,8 @@ class App:
if item in self._settable_fields:
attr_name = self._settable_fields[item]
setattr(self, attr_name, value)
if item == "current_provider" or item == "current_model":
ai_client.set_provider(self.current_provider, self.current_model)
if item == "gcli_path":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=value)
@@ -984,6 +988,26 @@ class App:
else:
print("[DEBUG] No pending dialog to reject")
def _handle_approve_tool(self) -> None:
"""Logic for approving a pending tool execution via API hooks."""
print("[DEBUG] _handle_approve_tool called")
if self._pending_ask_dialog:
self._handle_approve_ask()
else:
print("[DEBUG] No pending tool approval found")
def _handle_approve_spawn(self) -> None:
"""Logic for approving a pending sub-agent spawn via API hooks."""
print("[DEBUG] _handle_approve_spawn called")
if self._pending_mma_spawn:
# Synchronize with the handler logic
self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context)
# Crucially, close the modal state so UI can continue
self._mma_spawn_open = False
self._pending_mma_spawn = None
else:
print("[DEBUG] No pending spawn approval found")
def _handle_mma_respond(self, approved: bool, payload: str = None, abort: bool = False, prompt: str = None, context_md: str = None) -> None:
if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
@@ -1962,6 +1986,7 @@ class App:
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_accept_tracks(self) -> None:
self._show_track_proposal_modal = False
def _bg_task():
# Generate skeletons once
self.ai_status = "Phase 2: Generating skeletons for all tracks..."
@@ -1996,6 +2021,23 @@ class App:
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_start_track(self, user_data: Any = None) -> None:
if isinstance(user_data, str):
# If track_id is provided directly
track_id = user_data
# Ensure it's loaded as active
if not self.active_track or self.active_track.id != track_id:
self._cb_load_track(track_id)
if self.active_track:
# Use the active track object directly to start execution
self.mma_status = "running"
engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue)
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id)
full_md, _, _ = aggregate.run(flat)
asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop)
self.ai_status = f"Track '{self.active_track.description}' started."
return
idx = 0
if isinstance(user_data, int):
idx = user_data
@@ -2043,6 +2085,8 @@ class App:
print(f"Dependency error in track '{title}': {e}")
sorted_tickets_data = raw_tickets
# 3. Create Track and Ticket objects
from datetime import datetime
now = datetime.now()
tickets = []
for t_data in sorted_tickets_data:
ticket = Ticket(
@@ -2054,7 +2098,7 @@ class App:
step_mode=t_data.get("step_mode", False)
)
tickets.append(ticket)
track_id = f"track_{uuid.uuid4().hex[:8]}"
track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}"
track = Track(id=track_id, description=title, tickets=tickets)
# Initialize track state in the filesystem
from models import TrackState, Metadata
@@ -2079,6 +2123,10 @@ class App:
if self._show_track_proposal_modal:
imgui.open_popup("Track Proposal")
if imgui.begin_popup_modal("Track Proposal", True, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._show_track_proposal_modal:
imgui.close_current_popup()
imgui.end_popup()
return
imgui.text_colored(C_IN, "Proposed Implementation Tracks")
imgui.separator()
if not self.proposed_tracks:

View File

@@ -131,7 +131,7 @@ Collapsed=0
DockId=0x00000006,0
[Window][Approve Tool Execution]
Pos=512,437
Pos=1009,547
Size=416,325
Collapsed=0
@@ -147,6 +147,11 @@ Size=879,1183
Collapsed=0
DockId=0x00000004,1
[Window][Track Proposal]
Pos=709,326
Size=262,209
Collapsed=0
[Table][0xFB6E3870,4]
RefScale=13
Column 0 Width=80

View File

@@ -10,6 +10,7 @@ import datetime
LOG_FILE: str = 'logs/mma_delegation.log'
def generate_skeleton(code: str) -> str:
"""
Parses Python code and replaces function/method bodies with '...',
@@ -65,7 +66,7 @@ def get_model_for_role(role: str, failure_count: int = 0) -> str:
return 'gemini-3-flash-preview'
elif role == 'tier3-worker' or role == 'tier3':
if failure_count > 1:
return 'gemini-3-flash'
return 'gemini-3-flash-preview'
return 'gemini-2.5-flash-lite'
elif role == 'tier4-qa' or role == 'tier4':
return 'gemini-2.5-flash-lite'
@@ -126,42 +127,9 @@ def get_dependencies(filepath: str) -> list[str]:
print(f"Error getting dependencies for {filepath}: {e}")
return []
import os
import subprocess
import json
# Mock Response Definitions
MOCK_PLANNING_RESPONSE = {
"status": "success",
"message": "Mock response for planning task.",
"data": {
"task_type": "planning",
"details": "Mocked plan generated."
}
}
MOCK_GENERIC_RESPONSE = {
"status": "success",
"message": "Mock response from the agent.",
"data": {
"task_type": "generic_mock",
"details": "This is a generic mock response."
}
}
def execute_agent(role: str, prompt: str, docs: list[str], debug: bool = False, failure_count: int = 0) -> str:
model = get_model_for_role(role, failure_count)
# --- NEW MOCK HANDLING LOGIC ---
if model == 'mock':
# The 'prompt' argument here represents the user's task/command text.
if "Epic Initialization" in prompt or "Sprint Planning" in prompt:
return json.dumps(MOCK_PLANNING_RESPONSE)
else:
return json.dumps(MOCK_GENERIC_RESPONSE)
# --- END NEW MOCK HANDLING LOGIC ---
# Advanced Context: Dependency skeletons for Tier 3
injected_context = ""
# Whitelist of modules that sub-agents have "unfettered" (full) access to.

View File

@@ -8,21 +8,60 @@ def main() -> None:
sys.stderr.write(f"DEBUG: GEMINI_CLI_HOOK_CONTEXT: {os.environ.get('GEMINI_CLI_HOOK_CONTEXT')}\n")
# Read prompt from stdin
try:
# On Windows, stdin might be closed or behave weirdly if not handled
prompt = sys.stdin.read()
except EOFError:
prompt = ""
sys.stderr.write(f"DEBUG: Received prompt via stdin ({len(prompt)} chars)\n")
sys.stderr.flush()
# Skip management commands
if len(sys.argv) > 1 and sys.argv[1] in ["mcp", "extensions", "skills", "hooks"]:
return
# If the prompt contains tool results, provide final answer
# Check for specific simulation contexts
# Use startswith or check the beginning of the prompt to avoid matching text inside skeletons
if 'PATH: Epic Initialization' in prompt[:500]:
mock_response = [
{"id": "mock-track-1", "type": "Track", "module": "core", "persona": "Tech Lead", "severity": "Medium", "goal": "Mock Goal 1", "acceptance_criteria": ["criteria 1"], "title": "Mock Goal 1"},
{"id": "mock-track-2", "type": "Track", "module": "ui", "persona": "Frontend Lead", "severity": "Low", "goal": "Mock Goal 2", "acceptance_criteria": ["criteria 2"], "title": "Mock Goal 2"}
]
print(json.dumps({
"type": "message",
"role": "assistant",
"content": json.dumps(mock_response)
}), flush=True)
print(json.dumps({
"type": "result",
"status": "success",
"stats": {"total_tokens": 100, "input_tokens": 50, "output_tokens": 50},
"session_id": "mock-session-epic"
}), flush=True)
return
if 'PATH: Sprint Planning' in prompt[:500]:
mock_response = [
{"id": "mock-ticket-1", "type": "Ticket", "goal": "Mock Ticket 1", "target_file": "file1.py", "depends_on": [], "context_requirements": "req 1"},
{"id": "mock-ticket-2", "type": "Ticket", "goal": "Mock Ticket 2", "target_file": "file2.py", "depends_on": ["mock-ticket-1"], "context_requirements": "req 2"}
]
print(json.dumps({
"type": "message",
"role": "assistant",
"content": json.dumps(mock_response)
}), flush=True)
print(json.dumps({
"type": "result",
"status": "success",
"stats": {"total_tokens": 100, "input_tokens": 50, "output_tokens": 50},
"session_id": "mock-session-sprint"
}), flush=True)
return
# If the prompt contains tool results, provide final answer
if '"role": "tool"' in prompt or '"tool_call_id"' in prompt:
print(json.dumps({
"type": "message",
"role": "assistant",
"content": "I have processed the tool results. Everything looks good!"
"content": "SUCCESS: Mock Tier 3 worker implemented the change. [MOCK OUTPUT]"
}), flush=True)
print(json.dumps({
"type": "result",
@@ -31,7 +70,8 @@ def main() -> None:
"session_id": "mock-session-final"
}), flush=True)
return
# Default flow: simulate a tool call
# Default flow: simulate a tool call
bridge_path = os.path.abspath("scripts/cli_tool_bridge.py")
# Using format that bridge understands
bridge_tool_call = {
@@ -66,11 +106,6 @@ def main() -> None:
"tool_id": "call_123",
"parameters": {"path": "test.txt"}
}), flush=True)
print(json.dumps({
"type": "message",
"role": "assistant",
"content": "I am reading the file now..."
}), flush=True)
print(json.dumps({
"type": "result",
"status": "success",

View File

@@ -22,7 +22,7 @@ base_dir = "."
paths = []
[gemini_cli]
binary_path = "gemini"
binary_path = "C:\\projects\\manual_slop\\.venv\\Scripts\\python.exe C:\\projects\\manual_slop\\tests\\mock_gemini_cli.py"
[deepseek]
reasoning_effort = "medium"

View File

@@ -10,5 +10,8 @@ auto_add = true
[discussions.main]
git_commit = ""
last_updated = "2026-02-28T21:27:02"
history = []
last_updated = "2026-02-28T22:41:40"
history = [
"@2026-02-28T22:02:40\nSystem:\n[PERFORMANCE ALERT] CPU usage high: 83.5%. Please consider optimizing recent changes or reducing load.",
"@2026-02-28T22:03:10\nSystem:\n[PERFORMANCE ALERT] CPU usage high: 103.9%. Please consider optimizing recent changes or reducing load.",
]

View File

@@ -10,70 +10,159 @@ from api_hook_client import ApiHookClient
@pytest.mark.integration
def test_mma_complete_lifecycle(live_gui) -> None:
"""
"""
Tests the entire MMA lifecycle from epic planning to track loading and ticket verification
in a single test case to avoid state dependency issues between separate test functions.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
# 1. Set model to 'mock'.
try:
client.set_value('current_model', 'mock')
except Exception as e:
pytest.fail(f"Failed to set model to 'mock': {e}")
# 1. Set up the mock CLI provider
try:
client.set_value('current_provider', 'gemini_cli')
# Point the CLI adapter to our mock script
mock_cli_path = f'{sys.executable} {os.path.abspath("tests/mock_gemini_cli.py")}'
client.set_value('gcli_path', mock_cli_path)
except Exception as e:
pytest.fail(f"Failed to set up mock provider: {e}")
# 2. Enter epic and click 'Plan Epic'.
client.set_value('mma_epic_input', 'Develop a new feature')
client.click('btn_mma_plan_epic')
# 2. Enter epic and click 'Plan Epic'.
client.set_value('mma_epic_input', 'Develop a new feature')
client.click('btn_mma_plan_epic')
# 3. Wait for 'proposed_tracks'.
proposed_tracks_found = False
for _ in range(60): # Poll for up to 60 seconds
status = client.get_mma_status()
print(f"Polling status: {status}")
# Assuming 'ai_status' might be a key within the status dictionary. If not, this needs adjustment.
print(f"Polling ai_status: {status.get('ai_status', 'N/A')}")
if status and status.get('proposed_tracks') and len(status['proposed_tracks']) > 0:
proposed_tracks_found = True
break
time.sleep(1)
assert proposed_tracks_found, "Failed to find proposed tracks after planning epic."
# 3. Wait for 'proposed_tracks'.
proposed_tracks_found = False
for _ in range(60): # Poll for up to 60 seconds
status = client.get_mma_status()
print(f"Polling status: {status}")
print(f"Polling ai_status: {status.get('ai_status', 'N/A')}")
if status and status.get('pending_spawn') is True:
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
client.click('btn_approve_spawn')
elif status and status.get('pending_approval') is True:
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
client.click('btn_approve_tool')
if status and status.get('proposed_tracks') and len(status['proposed_tracks']) > 0:
proposed_tracks_found = True
break
time.sleep(1)
assert proposed_tracks_found, "Failed to find proposed tracks after planning epic."
# 4. Click 'Accept' to start tracks.
client.click('btn_mma_accept_tracks')
# 4. Click 'Accept' to start tracks.
client.click('btn_mma_accept_tracks')
time.sleep(2)
# 5. Wait for 'tracks' list to populate.
tracks_populated = False
for _ in range(30): # Poll for up to 30 seconds
status = client.get_mma_status()
if status and status.get('tracks') and len(status['tracks']) > 0:
tracks_populated = True
break
time.sleep(1)
assert tracks_populated, "Failed to populate tracks list after accepting proposed tracks."
# 5. Wait for 'tracks' list to populate with our mock tracks.
tracks_populated = False
for _ in range(30): # Poll for up to 30 seconds
status = client.get_mma_status()
if status and status.get('pending_spawn') is True:
client.click('btn_approve_spawn')
elif status and status.get('pending_approval') is True:
client.click('btn_approve_tool')
tracks = status.get('tracks', [])
if any('Mock Goal 1' in t.get('title', '') for t in tracks):
tracks_populated = True
break
time.sleep(1)
assert tracks_populated, "Failed to find 'Mock Goal 1' in tracks list after acceptance."
# 6. Verify that one of the new tracks can be loaded and its tickets appear in 'active_tickets'.
status_after_tracks = client.get_mma_status()
assert status_after_tracks is not None, "Failed to get MMA status after tracks populated."
tracks_list = status_after_tracks.get('tracks')
assert tracks_list is not None and len(tracks_list) > 0, "Tracks list is empty or not found."
# 6. Verify that one of the new tracks can be loaded and its tickets appear in 'active_tickets'.
status_after_tracks = client.get_mma_status()
assert status_after_tracks is not None, "Failed to get MMA status after tracks populated."
tracks_list = status_after_tracks.get('tracks')
assert tracks_list is not None and len(tracks_list) > 0, "Tracks list is empty or not found."
track_id_to_load = tracks_list[0]['id']
print(f"Attempting to load track with ID: {track_id_to_load}")
track_id_to_load = None
for track in tracks_list:
if 'Mock Goal 1' in track.get('title', ''):
track_id_to_load = track['id']
break
assert track_id_to_load is not None, "Could not find a track with 'Mock Goal 1' in its title."
print(f"Attempting to load track with ID: {track_id_to_load}")
# Load the first track
client.click('btn_mma_load_track', user_data=track_id_to_load)
# Load the first track
client.click('btn_mma_load_track', user_data=track_id_to_load)
# Poll until 'active_track' is not None and 'active_tickets' are present
active_track_and_tickets_found = False
for _ in range(60): # Poll for up to 60 seconds
status = client.get_mma_status()
if status and status.get('active_track') == track_id_to_load and \
'active_tickets' in status and len(status['active_tickets']) > 0:
active_track_and_tickets_found = True
break
time.sleep(1)
assert active_track_and_tickets_found, f"Timed out waiting for track {track_id_to_load} to load and populate active tickets."
# Poll until 'active_track' is not None and 'active_tickets' are present
active_track_and_tickets_found = False
for _ in range(60): # Poll for up to 60 seconds
status = client.get_mma_status()
print(f"Polling load status: {status}")
if status and status.get('pending_spawn') is True:
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
client.click('btn_approve_spawn')
elif status and status.get('pending_approval') is True:
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
client.click('btn_approve_tool')
print(f"Successfully loaded and verified track ID: {track_id_to_load} with active tickets.")
# Updated condition to correctly check active_track ID or value
active_track = status.get('active_track')
if status and ( (isinstance(active_track, dict) and active_track.get('id') == track_id_to_load) or (active_track == track_id_to_load) ) and \
'active_tickets' in status and len(status['active_tickets']) > 0:
active_track_and_tickets_found = True
break
time.sleep(1)
assert active_track_and_tickets_found, f"Timed out waiting for track {track_id_to_load} to load and populate active tickets."
print(f"Successfully loaded and verified track ID: {track_id_to_load} with active tickets.")
# 7. Start the MMA track and poll for its status.
print(f"Starting track {track_id_to_load}...")
client.click('btn_mma_start_track', user_data=track_id_to_load)
mma_running = False
for _ in range(120): # Poll for up to 120 seconds
status = client.get_mma_status()
print(f"Polling MMA status for 'running': {status.get('mma_status')}")
# Handle pending states during the run
if status and status.get('pending_spawn') is True:
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
client.click('btn_approve_spawn')
elif status and status.get('pending_approval') is True:
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
client.click('btn_approve_tool')
# Check if MMA is running
if status and status.get('mma_status') == 'running':
mma_running = True
break
# Also check if it's already finished or error
if status and status.get('mma_status') in ['done', 'error']:
break
time.sleep(1)
assert mma_running or (status and status.get('mma_status') == 'done'), f"Timed out waiting for MMA status to become 'running' for track {track_id_to_load}."
print(f"MMA status is: {status.get('mma_status')}")
# 8. Verify 'active_tier' change and output in 'mma_streams'.
streams_found = False
for _ in range(60): # Give it more time for the worker to spawn and respond
status = client.get_mma_status()
# Handle approvals if they pop up during worker execution
if status and status.get('pending_spawn') is True:
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
client.click('btn_approve_spawn')
elif status and status.get('pending_approval') is True:
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
client.click('btn_approve_tool')
streams = status.get('mma_streams', {})
print(f"Polling streams: {list(streams.keys())}")
if streams and any("Tier 3" in k for k in streams.keys()):
print(f"[SIM] Found Tier 3 worker output in streams: {list(streams.keys())}")
# Check for our specific mock content
tier3_key = [k for k in streams.keys() if "Tier 3" in k][0]
if "SUCCESS: Mock Tier 3 worker" in streams[tier3_key]:
print("[SIM] Verified mock worker output content.")
streams_found = True
break
time.sleep(1)
assert streams_found, "No Tier 3 mock output found in 'mma_streams'."
print("MMA complete lifecycle simulation successful.")