fix(mma): Change self.engine to self.engines dict for concurrent track support

- self.engine was a single ConductorEngine reference that got overwritten
  when multiple tracks ran concurrently, orphaning the first track's engine
- Now uses self.engines: Dict[str, ConductorEngine] keyed by track.id
- Updated _spawn_worker, kill_worker, pause_mma, resume_mma, approve_ticket,
  _load_active_tickets, and _update_ticket_depends_on to use engines.get(track_id)

Fixes concurrent MMA track execution bug where only one worker ever appeared.
This commit is contained in:
2026-05-07 07:54:39 -04:00
parent 9099b02002
commit ac0b564c02
7 changed files with 60 additions and 5511 deletions
+22 -17
View File
@@ -160,8 +160,7 @@ class AppController:
self._loop_thread: Optional[threading.Thread] = None
self.tracks: List[Dict[str, Any]] = []
self.active_track: Optional[models.Track] = None
self.engine: Optional[multi_agent_conductor.ConductorEngine] = None
self.active_tickets: List[Dict[str, Any]] = []
self.engines: Dict[str, multi_agent_conductor.ConductorEngine] = {}
self.mma_streams: Dict[str, str] = {}
self._worker_status: Dict[str, str] = {} # stream_id -> "running" | "completed" | "failed" | "killed"
self.MAX_STREAM_SIZE: int = 10 * 1024 # 10KB max per stream
@@ -2917,7 +2916,7 @@ class AppController:
# Use the active track object directly to start execution
self._set_mma_status("running")
engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode)
self.engine = engine
self.engines[self.active_track.id] = engine
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id)
full_md, _, _ = aggregate.run(flat)
threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start()
@@ -2991,7 +2990,7 @@ class AppController:
self._pending_gui_tasks.append({'action': 'refresh_from_project'})
# 4. Initialize ConductorEngine and run loop
engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode)
self.engine = engine
self.engines[self.active_track.id] = engine
# Use current full markdown context for the track execution
track_id_param = track.id
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param)
@@ -3018,31 +3017,35 @@ class AppController:
def _spawn_worker(self, ticket_id: str, data: dict = None) -> None:
"""Manually initiates a sub-agent execution for a ticket."""
if self.engine:
engine = self.engines.get(self.active_track.id if self.active_track else None)
if engine:
for t in self.active_track.tickets:
if t.id == ticket_id:
t.status = "todo"
t.step_mode = False
break
self.engine.engine.auto_queue = True
engine.engine.auto_queue = True
self.event_queue.put("mma_retry", {"ticket_id": ticket_id})
def kill_worker(self, worker_id: str) -> None:
"""Aborts a running worker."""
if self.engine:
self.engine.kill_worker(worker_id)
engine = self.engines.get(self.active_track.id if self.active_track else None)
if engine:
engine.kill_worker(worker_id)
def pause_mma(self) -> None:
"""Pauses the global MMA loop."""
self.mma_step_mode = True
if self.engine:
self.engine.pause()
engine = self.engines.get(self.active_track.id if self.active_track else None)
if engine:
engine.pause()
def resume_mma(self) -> None:
"""Resumes the global MMA loop."""
self.mma_step_mode = False
if self.engine:
self.engine.resume()
engine = self.engines.get(self.active_track.id if self.active_track else None)
if engine:
engine.resume()
def inject_context(self, data: dict) -> None:
"""Programmatic context injection."""
@@ -3058,8 +3061,9 @@ class AppController:
def approve_ticket(self, ticket_id: str) -> None:
"""Manually approves a ticket for execution."""
if self.engine and self.engine.engine:
self.engine.engine.approve_task(ticket_id)
engine = self.engines.get(self.active_track.id if self.active_track else None)
if engine and engine.engine:
engine.engine.approve_task(ticket_id)
else:
# Fallback if engine not running
for t in self.active_tickets:
@@ -3082,10 +3086,11 @@ class AppController:
if t.id == ticket_id:
t.depends_on = depends_on
break
if self.engine:
engine = self.engines.get(self.active_track.id if self.active_track else None)
if engine:
from src.dag_engine import TrackDAG, ExecutionEngine
self.engine.dag = TrackDAG(self.active_track.tickets)
self.engine.engine = ExecutionEngine(self.engine.dag, auto_queue=self.engine.engine.auto_queue)
engine.dag = TrackDAG(self.active_track.tickets)
engine.engine = ExecutionEngine(engine.dag, auto_queue=engine.engine.auto_queue)
self._push_mma_state_update()
def _cb_run_conductor_setup(self) -> None: