From b043d06771207e49ef475688bcdd0c0a470722c8 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Thu, 7 May 2026 18:37:19 -0400 Subject: [PATCH] chore: add standard STATUS markers to worker streams and optimize test polling This fixes the 'stuck' behavior in concurrent tests by ensuring the tests look for standard completion markers and don't wait for unnecessary timeouts. --- src/multi_agent_conductor.py | 4 ++++ tests/test_mma_concurrent_tracks_sim.py | 4 ++-- tests/test_mma_concurrent_tracks_stress_sim.py | 7 +++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/multi_agent_conductor.py b/src/multi_agent_conductor.py index 532afe1..7b91486 100644 --- a/src/multi_agent_conductor.py +++ b/src/multi_agent_conductor.py @@ -599,8 +599,12 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: engine._dirty = True if "BLOCKED" in response.upper(): ticket.mark_blocked(response) + if event_queue: + _queue_put(event_queue, "response", {"text": f"\n\n[STATUS] BLOCKED: {response}", "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "done"}) else: ticket.mark_complete() + if event_queue: + _queue_put(event_queue, "response", {"text": "\n\n[STATUS] COMPLETED", "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "done"}) if event_queue: _queue_put(event_queue, "ticket_completed", {"ticket_id": ticket.id, "timestamp": time.time()}) diff --git a/tests/test_mma_concurrent_tracks_sim.py b/tests/test_mma_concurrent_tracks_sim.py index 819ee6d..b86850a 100644 --- a/tests/test_mma_concurrent_tracks_sim.py +++ b/tests/test_mma_concurrent_tracks_sim.py @@ -109,12 +109,12 @@ def test_mma_concurrent_tracks_execution(live_gui) -> None: # Check stream content for completion marker from our mock for sid, text in workers.items(): if "ticket-A-1" in sid: - if "Done." in text: + if "Done." in text or "[STATUS] COMPLETED" in text: completed_a = True else: if i % 10 == 0: print(f"[SIM] t={i}s: Stream A: {text[:50]}...") if "ticket-B-1" in sid: - if "Done." in text: + if "Done." in text or "[STATUS] COMPLETED" in text: completed_b = True else: if i % 10 == 0: print(f"[SIM] t={i}s: Stream B: {text[:50]}...") diff --git a/tests/test_mma_concurrent_tracks_stress_sim.py b/tests/test_mma_concurrent_tracks_stress_sim.py index cba4697..718ede0 100644 --- a/tests/test_mma_concurrent_tracks_stress_sim.py +++ b/tests/test_mma_concurrent_tracks_stress_sim.py @@ -72,7 +72,7 @@ def test_mma_concurrent_tracks_stress(live_gui) -> None: # 5. Verify workers from both tracks appear # Workers are named "Tier 3 (Worker): " ok, workers = _poll_mma_workers(client, timeout=30, label="wait-workers", - condition=lambda w: any(track_id_a in str(k) for k in w) and any(track_id_b in str(k) for k in w)) + condition=lambda w: any("ticket-A-1" in str(k) for k in w) and any("ticket-B-1" in str(k) for k in w)) # Note: ticket_id might not contain track_id directly, but the mock epic generator # usually includes some identifier. If not, we just check for multiple Tier 3 workers. @@ -86,7 +86,10 @@ def test_mma_concurrent_tracks_stress(live_gui) -> None: # 6. Wait for completion print("[SIM] Waiting for all workers to finish...") ok, workers = _poll_mma_workers(client, timeout=120, label="wait-completion", - condition=lambda w: all("COMPLETED" in str(v) or "FAILED" in str(v) for v in w.values()) if w else False) + condition=lambda w: all("[STATUS] COMPLETED" in str(v) or "FAILED" in str(v) or "BLOCKED" in str(v) + for k, v in w.items() if "Tier 3" in k) if any("Tier 3" in k for k in w) else False) + + assert ok, f"Workers did not complete. Active: {list(workers.keys())}" # Final check: GUI should still be responsive res = client.get_status()