import sys import json import os # Persistent call counter (file-based so the mock survives across subprocess # invocations). The mock gemini CLI is a short-lived subprocess invoked once # per send() call; the session_id set by the adapter (--resume) tells the # mock which response to return. Path is relative to the repo root (the test # fixture sets subprocess cwd to tests/artifacts/live_gui_workspace_/ but # the mock is invoked from the project root by its absolute path). _CALL_COUNT_FILE = os.path.join( os.path.dirname(os.path.abspath(__file__)), "..", "artifacts", ".mock_concurrent_mma_call_count", ) _CALL_COUNT_FILE = os.path.abspath(_CALL_COUNT_FILE) def _next_call_count() -> int: """Atomically increment and return the per-test mock call count.""" try: n = 0 if os.path.exists(_CALL_COUNT_FILE): with open(_CALL_COUNT_FILE, "r", encoding="utf-8") as f: n = int((f.read() or "0").strip() or "0") n += 1 os.makedirs(os.path.dirname(_CALL_COUNT_FILE), exist_ok=True) with open(_CALL_COUNT_FILE, "w", encoding="utf-8") as f: f.write(str(n)) return n except Exception: return 0 def main() -> None: # Read prompt from stdin try: prompt = sys.stdin.read() except Exception: prompt = "" # Detect the session we're "resuming" via --resume arg (set by the # gemini_cli_adapter on subsequent calls). session_id = "" argv = sys.argv[1:] if "--resume" in argv: i = argv.index("--resume") if i + 1 < len(argv): session_id = argv[i + 1] call_n = _next_call_count() try: with open(b"C:\\projects\\manual_slop_tier2\\tests\\artifacts\\tier2_state\\fix_mma_concurrent_tracks_sim_20260627\\mock_diag.log", "ab") as _df: _df.write(f"[MOCK] call_n={call_n} session_id={session_id!r}\n".encode()) except Exception: pass # 1. Sprint Planning (different tickets for different tracks) # Route on prompt content (the production passes the track_brief which # contains "Track A" or "Track B"). The prior session_id-based routing was # fragile because: # 1. The call_n counter is shared across tests in the same session, so # call_n != 2 for the 1st sprint if a prior test ran. # 2. session_id="mock-sprint-A" means "this is a follow-up call after # the 1st sprint returned mock-sprint-A", so the response should be # sprint-B (2nd track), not sprint-A. # CHECK BEFORE epic so sprint takes priority over the catch-all epic branch. if 'generate the implementation tickets' in prompt: if "Track A" in prompt: track_label = "A" elif "Track B" in prompt: track_label = "B" elif "Track C" in prompt: track_label = "C" else: track_label = "A" _emit_sprint_ticket(track_label) try: with open(b"C:\\projects\\manual_slop_tier2\\tests\\artifacts\\tier2_state\\fix_mma_concurrent_tracks_sim_20260627\\mock_diag.log", "ab") as _df: _df.write(f"[MOCK] ROUTED TO: sprint track={track_label}\n".encode()) except Exception: pass return # 2. Worker Execution # CHECK BEFORE epic so worker takes priority over the catch-all epic branch. if 'You are assigned to Ticket' in prompt: # NOTE: Removed session_id.startswith("mock-worker-") fallback. The session_id # persists across tests in the same session (gemini_cli_adapter is a singleton). # The fallback caused test_mma_concurrent_tracks_stress_sim to fail when it ran # AFTER test_mma_concurrent_tracks_execution: the execution test set the session_id # to mock-worker-ticket-A-1, and the stress test's epic call used --resume with that # session_id, which the fallback incorrectly matched, returning a worker response # instead of an epic response. import re match = re.search(r'Ticket (ticket-[A-Ba-b]-1)', prompt, re.IGNORECASE) if match: tid = match.group(1) elif session_id.startswith("mock-worker-"): tid = session_id[len("mock-worker-"):] else: tid = "unknown" try: with open(b"C:\\projects\\manual_slop_tier2\\tests\\artifacts\\tier2_state\\fix_mma_concurrent_tracks_sim_20260627\\mock_diag.log", "ab") as _df: _df.write(f"[MOCK] ROUTED TO: worker tid={tid}\n".encode()) except Exception: pass print(json.dumps({ "type": "message", "role": "assistant", "content": f"Working on {tid}. Done." }), flush=True) print(json.dumps({ "type": "result", "status": "success", "stats": {"total_tokens": 50, "input_tokens": 25, "output_tokens": 25}, "session_id": f"mock-worker-{tid}" }), flush=True) return # 3. Epic Initialization (catch-all for any non-empty prompt that # does not match the sprint or worker patterns above). This makes the # mock robust to test-specific epic prompts (e.g. 'STRESS TEST: TRACK A # AND TRACK B' used by test_mma_concurrent_tracks_stress_sim). The # prior version only matched 'PATH: Epic Initialization', so other # prompts fell to the Default branch and the production failed to parse # the response as JSON, returning 0 tracks. if prompt.strip(): mock_response = [ {"id": "track-a", "goal": "Track A Goal", "title": "Track A"}, {"id": "track-b", "goal": "Track B Goal", "title": "Track B"} ] print(json.dumps({ "type": "message", "role": "assistant", "content": json.dumps(mock_response) }), flush=True) print(json.dumps({ "type": "result", "status": "success", "stats": {"total_tokens": 100, "input_tokens": 50, "output_tokens": 50}, "session_id": "mock-epic" }), flush=True) try: with open(b"C:\\projects\\manual_slop_tier2\\tests\\artifacts\\tier2_state\\fix_mma_concurrent_tracks_sim_20260627\\mock_diag.log", "ab") as _df: _df.write(b"[MOCK] ROUTED TO: epic_catchall\n") except Exception: pass return # Default try: with open(b"C:\\projects\\manual_slop_tier2\\tests\\artifacts\\tier2_state\\fix_mma_concurrent_tracks_sim_20260627\\mock_diag.log", "ab") as _df: _df.write(b"[MOCK] ROUTED TO: default\n") except Exception: pass print(json.dumps({ "type": "message", "role": "assistant", "content": f"Mock response. Received prompt: {prompt[:100]}..." }), flush=True) print(json.dumps({ "type": "result", "status": "success", "stats": {"total_tokens": 10, "input_tokens": 5, "output_tokens": 5}, "session_id": "mock-default" }), flush=True) def _emit_sprint_ticket(track_label: str) -> None: mock_response = [ {"id": f"ticket-{track_label}-1", "description": f"Ticket {track_label} 1", "status": "todo", "assigned_to": "worker", "depends_on": []} ] print(json.dumps({ "type": "message", "role": "assistant", "content": json.dumps(mock_response) }), flush=True) print(json.dumps({ "type": "result", "status": "success", "stats": {"total_tokens": 100, "input_tokens": 50, "output_tokens": 50}, "session_id": f"mock-sprint-{track_label}" }), flush=True) if __name__ == "__main__": main()