Compare commits

...

2 Commits

15 changed files with 372 additions and 90 deletions

View File

@@ -1614,6 +1614,12 @@ def run_tier4_analysis(stderr: str) -> str:
return f"[QA ANALYSIS FAILED] {e}" return f"[QA ANALYSIS FAILED] {e}"
# ------------------------------------------------------------------ unified send # ------------------------------------------------------------------ unified send
import json
from typing import Any, Callable, Optional, List
# Assuming _model, _system_prompt, _provider, _send_lock are module-level variables
# and the _send_xxx functions are also defined at module level.
def send( def send(
md_content: str, md_content: str,
user_message: str, user_message: str,
@@ -1639,6 +1645,62 @@ def send(
pre_tool_callback : Optional callback (payload: str) -> bool called before tool execution pre_tool_callback : Optional callback (payload: str) -> bool called before tool execution
qa_callback : Optional callback (stderr: str) -> str called for Tier 4 error analysis qa_callback : Optional callback (stderr: str) -> str called for Tier 4 error analysis
""" """
# --- START MOCK LOGIC ---
# Assuming _model, _custom_system_prompt, and _system_prompt are module-level variables.
# If _model is not 'mock', proceed to original provider logic.
if _model == 'mock':
mock_response_content = None
# Use _custom_system_prompt for keyword detection
current_system_prompt = _custom_system_prompt # Assuming _custom_system_prompt is accessible and defined
if 'tier1_epic_init' in current_system_prompt:
mock_response_content = [
{
"id": "mock-track-1",
"type": "epic",
"module": "conductor",
"persona": "Tier 1 Orchestrator",
"severity": "high",
"goal": "Initialize a new track.",
"acceptance_criteria": "Track created successfully with required fields."
},
{
"id": "mock-track-2",
"type": "epic",
"module": "conductor",
"persona": "Tier 1 Orchestrator",
"severity": "medium",
"goal": "Initialize another track.",
"acceptance_criteria": "Second track created successfully."
}
]
elif 'tier2_sprint_planning' in current_system_prompt:
mock_response_content = [
{
"id": "mock-ticket-1",
"type": "story",
"goal": "Implement feature X.",
"target_file": "src/feature_x.py",
"depends_on": [],
"context_requirements": ["requirements.txt", "main.py"]
},
{
"id": "mock-ticket-2",
"type": "bug",
"goal": "Fix bug Y.",
"target_file": "src/bug_y.py",
"depends_on": ["mock-ticket-1"],
"context_requirements": ["tests/test_bug_y.py"]
}
]
else:
mock_response_content = "Mock AI Response"
# The function is typed to return 'str', so we return a JSON string.
# Ensure 'json' is imported at the module level.
return json.dumps(mock_response_content)
# --- END MOCK LOGIC ---
with _send_lock: with _send_lock:
if _provider == "gemini": if _provider == "gemini":
return _send_gemini(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback) return _send_gemini(md_content, user_message, base_dir, file_items, discussion_history, pre_tool_callback, qa_callback)

View File

@@ -1,4 +1,4 @@
from __future__ import annotations from __future__ import annotations
import json import json
import threading import threading
import uuid import uuid
@@ -123,11 +123,15 @@ class HookHandler(BaseHTTPRequestHandler):
def get_mma(): def get_mma():
try: try:
result["mma_status"] = getattr(app, "mma_status", "idle") result["mma_status"] = getattr(app, "mma_status", "idle")
result["ai_status"] = getattr(app, "ai_status", "idle")
result["active_tier"] = getattr(app, "active_tier", None) result["active_tier"] = getattr(app, "active_tier", None)
result["active_track"] = getattr(app, "active_track", None) result["active_track"] = getattr(app, "active_track", None)
result["active_tickets"] = getattr(app, "active_tickets", []) result["active_tickets"] = getattr(app, "active_tickets", [])
result["mma_step_mode"] = getattr(app, "mma_step_mode", False) result["mma_step_mode"] = getattr(app, "mma_step_mode", False)
result["pending_approval"] = app._pending_mma_approval is not None result["pending_approval"] = app._pending_mma_approval is not None
# Added lines for tracks and proposed_tracks
result["tracks"] = getattr(app, "tracks", [])
result["proposed_tracks"] = getattr(app, "proposed_tracks", [])
finally: finally:
event.set() event.set()
with app._pending_gui_tasks_lock: with app._pending_gui_tasks_lock:

View File

@@ -5,8 +5,8 @@
- [x] Task: Implement helper methods in `ApiHookClient` for querying specific DearPyGui item states (e.g., `get_text_value`, `get_node_status`). 2a30e62 - [x] Task: Implement helper methods in `ApiHookClient` for querying specific DearPyGui item states (e.g., `get_text_value`, `get_node_status`). 2a30e62
## Phase 2: Epic & Track Verification ## Phase 2: Epic & Track Verification
- [~] Task: Write the simulation routine to trigger a new Epic and verify the Track Browser updates correctly. - [x] Task: Write the simulation routine to trigger a new Epic and verify the Track Browser updates correctly. 605dfc3
- [ ] Task: Verify that selecting a newly generated track successfully loads its initial (empty) state into the DAG visualizer. - [~] Task: Verify that selecting a newly generated track successfully loads its initial (empty) state into the DAG visualizer.
## Phase 3: DAG & Spawn Interception Verification ## Phase 3: DAG & Spawn Interception Verification
- [ ] Task: Simulate the "Start Track" action and verify the DAG visualizer populates with tasks. - [ ] Task: Simulate the "Start Track" action and verify the DAG visualizer populates with tasks.

View File

@@ -0,0 +1,50 @@
discussion = []
[metadata]
id = "track_51dabc55"
name = "Implement a robust mathematical engine for basic a"
status = "todo"
created_at = "2026-02-28T21:06:22.065199"
updated_at = "2026-02-28T21:06:22.065199"
[[tasks]]
id = "math_engine_add"
description = "Implement the addition operation for the mathematical engine."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = []
step_mode = false
[[tasks]]
id = "math_engine_subtract"
description = "Implement the subtraction operation for the mathematical engine."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"math_engine_add",
]
step_mode = false
[[tasks]]
id = "math_engine_multiply"
description = "Implement the multiplication operation for the mathematical engine."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"math_engine_subtract",
]
step_mode = false
[[tasks]]
id = "math_engine_divide"
description = "Implement the division operation for the mathematical engine, including handling division by zero."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"math_engine_multiply",
]
step_mode = false

View File

@@ -0,0 +1,75 @@
discussion = []
[metadata]
id = "track_d01fdb6e"
name = "Implement a robust, testable arithmetic engine for"
status = "todo"
created_at = "2026-02-28T21:00:16.295678"
updated_at = "2026-02-28T21:00:16.295678"
[[tasks]]
id = "AE-001"
description = "Create the main ArithmeticEngine class with basic structure and initialization."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = []
step_mode = false
[[tasks]]
id = "AE-002"
description = "Implement the 'add' method in the ArithmeticEngine class."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-001",
]
step_mode = false
[[tasks]]
id = "AE-003"
description = "Implement the 'subtract' method in the ArithmeticEngine class."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-001",
]
step_mode = false
[[tasks]]
id = "AE-004"
description = "Implement the 'multiply' method in the ArithmeticEngine class."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-001",
]
step_mode = false
[[tasks]]
id = "AE-005"
description = "Implement the 'divide' method in the ArithmeticEngine class, including division by zero handling."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-001",
]
step_mode = false
[[tasks]]
id = "AE-006"
description = "Add comprehensive unit tests for all arithmetic operations."
status = "todo"
assigned_to = "unassigned"
context_requirements = []
depends_on = [
"AE-002",
"AE-003",
"AE-004",
"AE-005",
]
step_mode = false

View File

@@ -29,12 +29,14 @@ All tasks follow a strict lifecycle:
- **Analyze Changes:** Use `get_git_diff` if the task involves modifying recently updated code. - **Analyze Changes:** Use `get_git_diff` if the task involves modifying recently updated code.
- **Minimize Token Burn:** Only use `read_file` with `start_line`/`end_line` for specific implementation details once target areas are identified. - **Minimize Token Burn:** Only use `read_file` with `start_line`/`end_line` for specific implementation details once target areas are identified.
4. **Write Failing Tests (Red Phase):** 4. **Write Failing Tests (Red Phase):**
- **Delegate Test Creation:** Do NOT write test code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a prompt to create the necessary test files and unit tests based on the task criteria. - **Pre-Delegation Checkpoint:** Before spawning a worker for dangerous or non-trivial changes, ensure your current progress is staged (`git add .`) or committed. This prevents losing iterations if a sub-agent incorrectly uses `git restore`.
- **Delegate Test Creation:** Do NOT write test code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a prompt to create the necessary test files and unit tests based on the task criteria. (If repeating due to failures, pass `--failure-count X` to switch to a more capable model).
- Take the code generated by the Worker and apply it. - Take the code generated by the Worker and apply it.
- **CRITICAL:** Run the tests and confirm that they fail as expected. This is the "Red" phase of TDD. Do not proceed until you have failing tests. - **CRITICAL:** Run the tests and confirm that they fail as expected. This is the "Red" phase of TDD. Do not proceed until you have failing tests.
4. **Implement to Pass Tests (Green Phase):** 4. **Implement to Pass Tests (Green Phase):**
- **Delegate Implementation:** Do NOT write the implementation code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a highly specific prompt to write the minimum amount of application code necessary to make the failing tests pass. - **Pre-Delegation Checkpoint:** Ensure current progress is staged or committed before delegating.
- **Delegate Implementation:** Do NOT write the implementation code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a highly specific prompt to write the minimum amount of application code necessary to make the failing tests pass. (If repeating due to failures, pass `--failure-count X` to switch to a more capable model).
- Take the code generated by the Worker and apply it. - Take the code generated by the Worker and apply it.
- Run the test suite again and confirm that all tests now pass. This is the "Green" phase. - Run the test suite again and confirm that all tests now pass. This is the "Green" phase.

View File

@@ -9,9 +9,9 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
Breaks down a Track Brief and module skeletons into discrete Tier 3 Tickets. Breaks down a Track Brief and module skeletons into discrete Tier 3 Tickets.
""" """
# 1. Set Tier 2 Model (Tech Lead - Flash) # 1. Set Tier 2 Model (Tech Lead - Flash)
if ai_client._model != 'mock':
ai_client.set_provider('gemini', 'gemini-2.5-flash-lite') ai_client.set_provider('gemini', 'gemini-2.5-flash-lite')
ai_client.reset_session() ai_client.reset_session() # 2. Construct Prompt
# 2. Construct Prompt
system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning") system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning")
user_message = ( user_message = (
f"### TRACK BRIEF:\n{track_brief}\n\n" f"### TRACK BRIEF:\n{track_brief}\n\n"
@@ -77,3 +77,4 @@ if __name__ == "__main__":
test_skeletons = "class NewFeature: pass" test_skeletons = "class NewFeature: pass"
tickets = generate_tickets(test_brief, test_skeletons) tickets = generate_tickets(test_brief, test_skeletons)
print(json.dumps(tickets, indent=2)) print(json.dumps(tickets, indent=2))

View File

@@ -1,6 +1,6 @@
[ai] [ai]
provider = "gemini_cli" provider = "gemini_cli"
model = "gemini-2.5-flash-lite" model = "mock"
temperature = 0.0 temperature = 0.0
max_tokens = 8192 max_tokens = 8192
history_trunc_limit = 8000 history_trunc_limit = 8000

View File

@@ -896,6 +896,8 @@ class App:
user_data = task.get("user_data") user_data = task.get("user_data")
if item == "btn_project_new_automated": if item == "btn_project_new_automated":
self._cb_new_project_automated(user_data) self._cb_new_project_automated(user_data)
elif item == "btn_mma_load_track":
self._cb_load_track(user_data)
elif item in self._clickable_actions: elif item in self._clickable_actions:
# Check if it's a method that accepts user_data # Check if it's a method that accepts user_data
import inspect import inspect
@@ -935,6 +937,8 @@ class App:
self._pending_mma_approval = task self._pending_mma_approval = task
if "dialog_container" in task: if "dialog_container" in task:
task["dialog_container"][0] = dlg task["dialog_container"][0] = dlg
elif action == 'refresh_from_project':
self._refresh_from_project()
elif action == "mma_spawn_approval": elif action == "mma_spawn_approval":
dlg = MMASpawnApprovalDialog( dlg = MMASpawnApprovalDialog(
task.get("ticket_id"), task.get("ticket_id"),
@@ -1959,9 +1963,36 @@ class App:
def _cb_accept_tracks(self) -> None: def _cb_accept_tracks(self) -> None:
def _bg_task(): def _bg_task():
for track_data in self.proposed_tracks: # Generate skeletons once
self._start_track_logic(track_data) self.ai_status = "Phase 2: Generating skeletons for all tracks..."
self.ai_status = "Tracks accepted and execution started." parser = ASTParser(language="python")
generated_skeletons = ""
try:
for i, file_path in enumerate(self.files):
try:
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
abs_path = Path(self.ui_files_base_dir) / file_path
if abs_path.exists() and abs_path.suffix == ".py":
with open(abs_path, "r", encoding="utf-8") as f:
code = f.read()
generated_skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n"
except Exception as e:
print(f"Error parsing skeleton for {file_path}: {e}")
except Exception as e:
self.ai_status = f"Error generating skeletons: {e}"
print(f"Error generating skeletons: {e}")
return # Exit if skeleton generation fails
# Now loop through tracks and call _start_track_logic with generated skeletons
total_tracks = len(self.proposed_tracks)
for i, track_data in enumerate(self.proposed_tracks):
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
self.ai_status = f"Processing track {i+1} of {total_tracks}: '{title}'..."
self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started
self.ai_status = f"All {total_tracks} tracks accepted and execution started."
threading.Thread(target=_bg_task, daemon=True).start() threading.Thread(target=_bg_task, daemon=True).start()
def _cb_start_track(self, user_data: Any = None) -> None: def _cb_start_track(self, user_data: Any = None) -> None:
@@ -1976,14 +2007,16 @@ class App:
threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start() threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start()
self.ai_status = f"Track '{title}' started." self.ai_status = f"Track '{title}' started."
def _start_track_logic(self, track_data: dict[str, Any]) -> None: def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None:
try: try:
goal = track_data.get("goal", "") goal = track_data.get("goal", "")
title = track_data.get("title") or track_data.get("goal", "Untitled Track") title = track_data.get("title") or track_data.get("goal", "Untitled Track")
self.ai_status = f"Phase 2: Generating tickets for {title}..." self.ai_status = f"Phase 2: Generating tickets for {title}..."
skeletons = "" # Initialize skeletons variable
if skeletons_str is None: # Only generate if not provided
# 1. Get skeletons for context # 1. Get skeletons for context
parser = ASTParser(language="python") parser = ASTParser(language="python")
skeletons = ""
for i, file_path in enumerate(self.files): for i, file_path in enumerate(self.files):
try: try:
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..." self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
@@ -1991,9 +2024,12 @@ class App:
if abs_path.exists() and abs_path.suffix == ".py": if abs_path.exists() and abs_path.suffix == ".py":
with open(abs_path, "r", encoding="utf-8") as f: with open(abs_path, "r", encoding="utf-8") as f:
code = f.read() code = f.read()
skeletons += f"\nFile: {file_path}\n{parser.get_skeleton(code)}\n" skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n"
except Exception as e: except Exception as e:
print(f"Error parsing skeleton for {file_path}: {e}") print(f"Error parsing skeleton for {file_path}: {e}")
else:
skeletons = skeletons_str # Use provided skeletons
self.ai_status = "Phase 2: Calling Tech Lead..." self.ai_status = "Phase 2: Calling Tech Lead..."
raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons) raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons)
if not raw_tickets: if not raw_tickets:

View File

@@ -79,7 +79,7 @@ DockId=0x0000000F,2
[Window][Theme] [Window][Theme]
Pos=0,17 Pos=0,17
Size=632,824 Size=32,824
Collapsed=0 Collapsed=0
DockId=0x00000005,1 DockId=0x00000005,1
@@ -89,14 +89,14 @@ Size=900,700
Collapsed=0 Collapsed=0
[Window][Diagnostics] [Window][Diagnostics]
Pos=634,17 Pos=34,17
Size=911,643 Size=765,545
Collapsed=0 Collapsed=0
DockId=0x00000010,0 DockId=0x00000010,0
[Window][Context Hub] [Window][Context Hub]
Pos=0,17 Pos=0,17
Size=632,824 Size=32,824
Collapsed=0 Collapsed=0
DockId=0x00000005,0 DockId=0x00000005,0
@@ -107,26 +107,26 @@ Collapsed=0
DockId=0x0000000D,0 DockId=0x0000000D,0
[Window][Discussion Hub] [Window][Discussion Hub]
Pos=1547,17 Pos=801,17
Size=879,1395 Size=879,1183
Collapsed=0 Collapsed=0
DockId=0x00000004,0 DockId=0x00000004,0
[Window][Operations Hub] [Window][Operations Hub]
Pos=634,17 Pos=34,17
Size=911,643 Size=765,545
Collapsed=0 Collapsed=0
DockId=0x00000010,1 DockId=0x00000010,1
[Window][Files & Media] [Window][Files & Media]
Pos=0,843 Pos=0,843
Size=632,569 Size=32,357
Collapsed=0 Collapsed=0
DockId=0x00000006,1 DockId=0x00000006,1
[Window][AI Settings] [Window][AI Settings]
Pos=0,843 Pos=0,843
Size=632,569 Size=32,357
Collapsed=0 Collapsed=0
DockId=0x00000006,0 DockId=0x00000006,0
@@ -136,14 +136,14 @@ Size=416,325
Collapsed=0 Collapsed=0
[Window][MMA Dashboard] [Window][MMA Dashboard]
Pos=634,662 Pos=34,564
Size=911,750 Size=765,636
Collapsed=0 Collapsed=0
DockId=0x00000011,0 DockId=0x00000011,0
[Window][Log Management] [Window][Log Management]
Pos=1547,17 Pos=801,17
Size=879,1395 Size=879,1183
Collapsed=0 Collapsed=0
DockId=0x00000004,1 DockId=0x00000004,1
@@ -173,7 +173,7 @@ Column 3 Weight=1.0000
DockNode ID=0x00000008 Pos=3125,170 Size=593,1157 Split=Y DockNode ID=0x00000008 Pos=3125,170 Size=593,1157 Split=Y
DockNode ID=0x00000009 Parent=0x00000008 SizeRef=1029,147 Selected=0x0469CA7A DockNode ID=0x00000009 Parent=0x00000008 SizeRef=1029,147 Selected=0x0469CA7A
DockNode ID=0x0000000A Parent=0x00000008 SizeRef=1029,145 Selected=0xDF822E02 DockNode ID=0x0000000A Parent=0x00000008 SizeRef=1029,145 Selected=0xDF822E02
DockSpace ID=0xAFC85805 Window=0x079D3A04 Pos=0,17 Size=2426,1395 Split=Y DockSpace ID=0xAFC85805 Window=0x079D3A04 Pos=0,17 Size=1680,1183 Split=Y
DockNode ID=0x0000000C Parent=0xAFC85805 SizeRef=1362,1041 Split=X Selected=0x5D11106F DockNode ID=0x0000000C Parent=0xAFC85805 SizeRef=1362,1041 Split=X Selected=0x5D11106F
DockNode ID=0x00000003 Parent=0x0000000C SizeRef=1545,1183 Split=X DockNode ID=0x00000003 Parent=0x0000000C SizeRef=1545,1183 Split=X
DockNode ID=0x0000000B Parent=0x00000003 SizeRef=404,1186 Split=Y Selected=0xF4139CA2 DockNode ID=0x0000000B Parent=0x00000003 SizeRef=404,1186 Split=Y Selected=0xF4139CA2
@@ -182,7 +182,7 @@ DockSpace ID=0xAFC85805 Window=0x079D3A04 Pos=0,17 Size=2426,1395 Sp
DockNode ID=0x00000005 Parent=0x00000007 SizeRef=295,824 Selected=0xF4139CA2 DockNode ID=0x00000005 Parent=0x00000007 SizeRef=295,824 Selected=0xF4139CA2
DockNode ID=0x00000006 Parent=0x00000007 SizeRef=295,724 CentralNode=1 Selected=0x7BD57D6A DockNode ID=0x00000006 Parent=0x00000007 SizeRef=295,724 CentralNode=1 Selected=0x7BD57D6A
DockNode ID=0x0000000E Parent=0x00000002 SizeRef=911,858 Split=Y Selected=0x418C7449 DockNode ID=0x0000000E Parent=0x00000002 SizeRef=911,858 Split=Y Selected=0x418C7449
DockNode ID=0x00000010 Parent=0x0000000E SizeRef=868,545 Selected=0x418C7449 DockNode ID=0x00000010 Parent=0x0000000E SizeRef=868,545 Selected=0xB4CBF21A
DockNode ID=0x00000011 Parent=0x0000000E SizeRef=868,636 Selected=0x3AEC3498 DockNode ID=0x00000011 Parent=0x0000000E SizeRef=868,636 Selected=0x3AEC3498
DockNode ID=0x00000001 Parent=0x0000000B SizeRef=1029,775 Selected=0x8B4EBFA6 DockNode ID=0x00000001 Parent=0x0000000B SizeRef=1029,775 Selected=0x8B4EBFA6
DockNode ID=0x0000000D Parent=0x00000003 SizeRef=435,1186 Selected=0x363E93D6 DockNode ID=0x0000000D Parent=0x00000003 SizeRef=435,1186 Selected=0x363E93D6

View File

@@ -15,11 +15,13 @@ To ensure proper environment handling and logging, you MUST NOT call the `gemini
## 1. The Tier 3 Worker (Execution) ## 1. The Tier 3 Worker (Execution)
When performing code modifications or implementing specific requirements: When performing code modifications or implementing specific requirements:
1. **DO NOT** perform large code writes yourself. 1. **Pre-Delegation Checkpoint:** For dangerous or non-trivial changes, ALWAYS stage your changes (`git add .`) or commit before delegating to a Tier 3 Worker. If the worker fails or runs `git restore`, you will lose all prior AI iterations for that file if it wasn't staged/committed.
2. **DO** construct a single, highly specific prompt with a clear objective. 2. **DO NOT** perform large code writes yourself.
3. **DO** spawn a Tier 3 Worker. 3. **DO** construct a single, highly specific prompt with a clear objective.
4. **DO** spawn a Tier 3 Worker.
*Command:* `uv run python scripts/mma_exec.py --role tier3-worker "Implement [SPECIFIC_INSTRUCTION] in [FILE_PATH]. Follow TDD and return success status or code changes."` *Command:* `uv run python scripts/mma_exec.py --role tier3-worker "Implement [SPECIFIC_INSTRUCTION] in [FILE_PATH]. Follow TDD and return success status or code changes."`
4. The Tier 3 Worker is stateless and has tool access for file I/O. 5. **Handling Repeated Failures:** If a Tier 3 Worker fails multiple times on the same task, it may lack the necessary capability. You must track failures and retry with `--failure-count <N>` (e.g., `--failure-count 2`). This tells `mma_exec.py` to escalate the sub-agent to a more powerful reasoning model (like `gemini-3-flash`).
6. The Tier 3 Worker is stateless and has tool access for file I/O.
## 2. The Tier 4 QA Agent (Diagnostics) ## 2. The Tier 4 QA Agent (Diagnostics)
If you run a test or command that fails with a significant error or large traceback: If you run a test or command that fails with a significant error or large traceback:

View File

@@ -57,13 +57,15 @@ def generate_skeleton(code: str) -> str:
except Exception as e: except Exception as e:
return f"# Error generating skeleton: {e}\n{code}" return f"# Error generating skeleton: {e}\n{code}"
def get_model_for_role(role: str) -> str: def get_model_for_role(role: str, failure_count: int = 0) -> str:
"""Returns the specific model to use for a given tier role.""" """Returns the specific model to use for a given tier role."""
if role == 'tier1-orchestrator' or role == 'tier1': if role == 'tier1-orchestrator' or role == 'tier1':
return 'gemini-3.1-pro-preview' return 'gemini-3.1-pro-preview'
elif role == 'tier2-tech-lead' or role == 'tier2': elif role == 'tier2-tech-lead' or role == 'tier2':
return 'gemini-3-flash-preview' return 'gemini-3-flash-preview'
elif role == 'tier3-worker' or role == 'tier3': elif role == 'tier3-worker' or role == 'tier3':
if failure_count > 1:
return 'gemini-3-flash'
return 'gemini-2.5-flash-lite' return 'gemini-2.5-flash-lite'
elif role == 'tier4-qa' or role == 'tier4': elif role == 'tier4-qa' or role == 'tier4':
return 'gemini-2.5-flash-lite' return 'gemini-2.5-flash-lite'
@@ -124,8 +126,8 @@ def get_dependencies(filepath: str) -> list[str]:
print(f"Error getting dependencies for {filepath}: {e}") print(f"Error getting dependencies for {filepath}: {e}")
return [] return []
def execute_agent(role: str, prompt: str, docs: list[str], debug: bool = False) -> str: def execute_agent(role: str, prompt: str, docs: list[str], debug: bool = False, failure_count: int = 0) -> str:
model = get_model_for_role(role) model = get_model_for_role(role, failure_count)
# Advanced Context: Dependency skeletons for Tier 3 # Advanced Context: Dependency skeletons for Tier 3
injected_context = "" injected_context = ""
# Whitelist of modules that sub-agents have "unfettered" (full) access to. # Whitelist of modules that sub-agents have "unfettered" (full) access to.
@@ -249,6 +251,12 @@ def create_parser() -> argparse.ArgumentParser:
action="store_true", action="store_true",
help="Enable debug logging" help="Enable debug logging"
) )
parser.add_argument(
"--failure-count",
type=int,
default=0,
help="Number of times this task has failed previously"
)
parser.add_argument( parser.add_argument(
"prompt", "prompt",
type=str, type=str,
@@ -263,6 +271,7 @@ def main() -> None:
role = args.role role = args.role
prompt = args.prompt prompt = args.prompt
debug = args.debug debug = args.debug
failure_count = args.failure_count
docs = [] docs = []
if args.task_file and os.path.exists(args.task_file): if args.task_file and os.path.exists(args.task_file):
with open(args.task_file, "rb") as f: with open(args.task_file, "rb") as f:
@@ -272,6 +281,7 @@ def main() -> None:
docs = task_data.get("docs", []) docs = task_data.get("docs", [])
# Only override debug if it's explicitly set in the task file (optional) # Only override debug if it's explicitly set in the task file (optional)
debug = task_data.get("debug", debug) debug = task_data.get("debug", debug)
failure_count = task_data.get("failure_count", failure_count)
if not role or not prompt: if not role or not prompt:
parser.print_help() parser.print_help()
return return
@@ -283,8 +293,8 @@ def main() -> None:
for ref in file_refs: for ref in file_refs:
if os.path.exists(ref) and ref not in docs: if os.path.exists(ref) and ref not in docs:
docs.append(ref) docs.append(ref)
print(f"Executing role: {role} with docs: {docs} (debug={debug})") print(f"Executing role: {role} with docs: {docs} (debug={debug}, failure_count={failure_count})")
result = execute_agent(role, prompt, docs, debug=debug) result = execute_agent(role, prompt, docs, debug=debug, failure_count=failure_count)
print(result) print(result)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -37,6 +37,6 @@ web_search = true
fetch_url = true fetch_url = true
[mma] [mma]
epic = "" epic = "Develop a new feature"
active_track_id = "" active_track_id = ""
tracks = [] tracks = []

View File

@@ -10,5 +10,5 @@ auto_add = true
[discussions.main] [discussions.main]
git_commit = "" git_commit = ""
last_updated = "2026-02-28T20:50:25" last_updated = "2026-02-28T21:27:02"
history = [] history = []

View File

@@ -9,31 +9,71 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
@pytest.mark.integration @pytest.mark.integration
def test_mma_epic_simulation(live_gui) -> None: def test_mma_complete_lifecycle(live_gui) -> None:
""" """
Integration test for MMA epic simulation. Tests the entire MMA lifecycle from epic planning to track loading and ticket verification
Red Phase: asserts False. in a single test case to avoid state dependency issues between separate test functions.
""" """
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)
# Try selecting MMA Dashboard tab if applicable (using typical naming convention)
try: # 1. Set model to 'mock'.
client.select_tab('main_tab_bar', 'tab_mma')
except Exception:
pass
# Set model to mock to avoid real API calls and timeouts
try: try:
client.set_value('current_model', 'mock') client.set_value('current_model', 'mock')
except Exception: except Exception as e:
pass pytest.fail(f"Failed to set model to 'mock': {e}")
client.set_value('mma_epic_input', 'Build a simple calculator')
# 2. Enter epic and click 'Plan Epic'.
client.set_value('mma_epic_input', 'Develop a new feature')
client.click('btn_mma_plan_epic') client.click('btn_mma_plan_epic')
# Poll client.get_mma_status() every 1 second (up to 30 seconds)
success = False # 3. Wait for 'proposed_tracks'.
for i in range(30): proposed_tracks_found = False
for _ in range(60): # Poll for up to 60 seconds
status = client.get_mma_status() status = client.get_mma_status()
if status and status.get('tracks') and len(status['tracks']) > 0: print(f"Polling status: {status}")
success = True # Assuming 'ai_status' might be a key within the status dictionary. If not, this needs adjustment.
print(f"Polling ai_status: {status.get('ai_status', 'N/A')}")
if status and status.get('proposed_tracks') and len(status['proposed_tracks']) > 0:
proposed_tracks_found = True
break break
time.sleep(1) time.sleep(1)
assert success, "Failed to generate at least one track." assert proposed_tracks_found, "Failed to find proposed tracks after planning epic."
# 4. Click 'Accept' to start tracks.
client.click('btn_mma_accept_tracks')
# 5. Wait for 'tracks' list to populate.
tracks_populated = False
for _ in range(30): # Poll for up to 30 seconds
status = client.get_mma_status()
if status and status.get('tracks') and len(status['tracks']) > 0:
tracks_populated = True
break
time.sleep(1)
assert tracks_populated, "Failed to populate tracks list after accepting proposed tracks."
# 6. Verify that one of the new tracks can be loaded and its tickets appear in 'active_tickets'.
status_after_tracks = client.get_mma_status()
assert status_after_tracks is not None, "Failed to get MMA status after tracks populated."
tracks_list = status_after_tracks.get('tracks')
assert tracks_list is not None and len(tracks_list) > 0, "Tracks list is empty or not found."
track_id_to_load = tracks_list[0]['id']
print(f"Attempting to load track with ID: {track_id_to_load}")
# Load the first track
client.click('btn_mma_load_track', user_data=track_id_to_load)
# Poll until 'active_track' is not None and 'active_tickets' are present
active_track_and_tickets_found = False
for _ in range(60): # Poll for up to 60 seconds
status = client.get_mma_status()
if status and status.get('active_track') == track_id_to_load and \
'active_tickets' in status and len(status['active_tickets']) > 0:
active_track_and_tickets_found = True
break
time.sleep(1)
assert active_track_and_tickets_found, f"Timed out waiting for track {track_id_to_load} to load and populate active tickets."
print(f"Successfully loaded and verified track ID: {track_id_to_load} with active tickets.")