From e03681741abd57ea4d62df8e414ae73078a37b7c Mon Sep 17 00:00:00 2001 From: Ed_ Date: Wed, 10 Jun 2026 23:31:43 -0400 Subject: [PATCH] docs(mma-conductor): rewrite ExecutionEngine/ConductorEngine/WorkerPool/mma_exec sections to match current src/multi_agent_conductor.py (predates the conductor_engine refactor) --- docs/guide_multi_agent_conductor.md | 288 ++++++++++------------------ 1 file changed, 100 insertions(+), 188 deletions(-) diff --git a/docs/guide_multi_agent_conductor.md b/docs/guide_multi_agent_conductor.md index 7547062f..a37b61a7 100644 --- a/docs/guide_multi_agent_conductor.md +++ b/docs/guide_multi_agent_conductor.md @@ -165,229 +165,141 @@ def get_executable_tickets(track: "Track") -> list[Ticket]: ## The `ExecutionEngine` -The execution engine handles **state machine transitions** between Auto-Queue and Step Mode. +--- -### Execution Modes +## The `ExecutionEngine` (in `src/dag_engine.py:176-228`) -```python -class ExecutionMode(Enum): - AUTO_QUEUE = "auto_queue" # Autonomous worker spawning - STEP_MODE = "step_mode" # Explicit manual approval per transition -``` - -### `ExecutionEngine.tick()` - -The main loop, called by the conductor at a configurable interval (default 100ms): +`ExecutionEngine` is a **state machine facade around `TrackDAG`**, not an enum-driven auto/step dispatcher. There is no `ExecutionMode` enum; the engine takes a single `auto_queue: bool` flag. ```python class ExecutionEngine: - def __init__(self, dag: TrackDAG, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE): - self.dag = dag - self.mode = mode - self.pending_approval: list[str] = [] # Ticket IDs awaiting approval in Step Mode - - def tick(self) -> list[str]: - """Returns list of ticket IDs to dispatch in this tick.""" - if self.mode == ExecutionMode.AUTO_QUEUE: - return self.dag.ready_tickets() - elif self.mode == ExecutionMode.STEP_MODE: - # Only return one ticket at a time, requiring explicit approval - if self.pending_approval: - return self.pending_approval[:1] - ready = self.dag.ready_tickets() - if ready: - self.pending_approval = ready[:1] - return self.pending_approval + """A state machine that governs the progression of tasks within a TrackDAG. + Handles automatic queueing and manual task approval.""" + def __init__(self, dag: TrackDAG, auto_queue: bool = False) -> None: + self.dag = dag + self.auto_queue = auto_queue ``` -### Programmable Transitions +**Methods** (actual signatures): -```python -def set_mode(self, mode: ExecutionMode) -> None: - self.mode = mode - if mode == ExecutionMode.AUTO_QUEUE: - self.pending_approval.clear() +| Method | Returns | Purpose | +|---|---|---| +| `tick()` | `list[Ticket]` | Calls `dag.cascade_blocks()` then `dag.get_ready_tasks()`. Returns the ready list. **Does NOT auto-promote tickets to `in_progress`.** | +| `approve_task(task_id)` | `None` | Manual transition: `ticket.status` "todo" → "in_progress" IFF dependencies are met (`is_ticket_ready` returns True). | +| `update_task_status(task_id, status)` | `None` | Force-update a ticket's status (e.g. to "completed" or "blocked"). | -def approve_next(self) -> str | None: - """Returns the approved ticket ID, or None if nothing pending.""" - if not self.pending_approval: - return None - return self.pending_approval.pop(0) - -def reject_next(self) -> str | None: - """Returns the rejected ticket ID, marking it BLOCKED.""" - if not self.pending_approval: - return None - tid = self.pending_approval.pop(0) - self.dag.nodes[tid].status = "blocked" - return tid -``` - -These are exposed to the GUI as "Step Mode" controls and to the Hook API as `set_execution_mode` / `approve_ticket` / `reject_ticket`. +**Step Mode / Auto-Queue** is implemented at the **caller** layer (`ConductorEngine.run` in `multi_agent_conductor.py`), not inside `ExecutionEngine`. The `auto_queue` parameter is consulted there, not in `tick()`. The ConductorEngine pushes tickets to `in_progress` based on `auto_queue` and the per-ticket `step_mode` flag. --- -## The `MultiAgentConductor` (in `src/multi_agent_conductor.py`) +## The `ConductorEngine` (in `src/multi_agent_conductor.py:116+`) -### `__init__(self, controller: AppController)` +The actual class is named `ConductorEngine`, not `MultiAgentConductor`. It owns the DAG, the engine, and a `WorkerPool`; pushes state to the GUI; and runs the main async dispatch loop. + +### `__init__(track, event_queue=None, auto_queue=False, max_workers=4)` ```python -class MultiAgentConductor: - def __init__(self, controller: AppController): - self.controller = controller - self.dag_engine: ExecutionEngine | None = None - self.worker_pool: WorkerPool | None = None - self.current_track: Track | None = None - self.tier_assignments: dict[str, str] = {} # ticket_id -> "tier3-worker" - self.persona_overrides: dict[str, str] = {} # ticket_id -> persona_name - self._stop_event = threading.Event() - self._dispatch_thread: threading.Thread | None = None +class ConductorEngine: + def __init__( + self, + track: Track, + event_queue: Optional[events.AsyncEventQueue] = None, + auto_queue: bool = False, + max_workers: int = 4, + ) -> None: + self.track = track + self.event_queue = event_queue + self.tier_usage = { + "Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None}, + "Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None}, + "Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None}, + "Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None}, + } + self.dag = TrackDAG(self.track.tickets) + self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue) + self.pool = WorkerPool(max_workers=max_workers) + self._workers_lock = threading.Lock() + self._active_workers: dict[str, threading.Thread] = {} + self._abort_events: dict[str, threading.Event] = {} + self._pause_event: threading.Event = threading.Event() + self._tier_usage_lock = threading.Lock() + self._dirty: bool = True ``` -### `load_track(track_id: str) -> Track` +`max_workers` is **not** read from `config.toml` by the engine — it's a constructor parameter. The 3 call sites in `AppController` (`src/app_controller.py:4132-4133`, `4145-4146`, `4223-4224`) all read `[mma].max_workers` from TOML and pass it in. Default is 4. -Reads `conductor/tracks//plan.md` and parses the ticket list: +### Key methods + +| Method | Returns | Purpose | +|---|---|---| +| `update_usage(tier, input_tokens, output_tokens)` | `None` | Cumulative token accounting per tier (under `_tier_usage_lock`) | +| `pause()` / `resume()` | `None` | Set/clear `_pause_event` | +| `approve_task(task_id)` | `None` | Delegates to `engine.approve_task`; sets `_dirty = True` | +| `update_task_status(task_id, status)` | `None` | Delegates to `engine.update_task_status`; sets `_dirty = True` | +| `kill_worker(ticket_id)` | `None` | Sets the per-ticket abort event; joins the thread with 1.0s timeout | +| `_push_state(status, active_tier)` | `None` | Builds a payload dict and `await event_queue.put("mma_state_update", payload)` | +| `parse_json_tickets(json_str)` | `list[dict]` | Parses Tier 2 LLM output into a list of ticket dicts (the ingestion path) | +| `run()` | (async coroutine) | The main async dispatch loop; see below | + +### `run()` — The Main Async Dispatch Loop + +The actual `run()` is an `async` coroutine that runs the dispatch loop, NOT a `_dispatch_loop` background thread. It uses `asyncio` and `loop.run_in_executor` to bridge to the blocking `run_worker_lifecycle` call. + +### `_push_state()` payload shape ```python -def load_track(self, track_id: str) -> Track: - track_dir = self.controller.paths.tracks_dir / track_id - plan_path = track_dir / "plan.md" - tickets = parse_plan_md(plan_path) # Returns list[dict] - track = Track(id=track_id, tickets=tickets, plan_path=plan_path) - return track +async def _push_state(self, status: str = "running", active_tier: str = None) -> None: + if not self.event_queue: + return + payload = { + "status": status, + "active_tier": active_tier, + "tier_usage": self.tier_usage, + "track": {"id": self.track.id, "title": self.track.description}, + "tickets": [asdict(t) for t in self.track.tickets], + } + await self.event_queue.put("mma_state_update", payload) ``` -The track is then passed to the DAG engine: - -```python -def start(self, track: Track, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE) -> None: - self.current_track = track - self.dag_engine = ExecutionEngine(TrackDAG(track.tickets), mode) - self.worker_pool = WorkerPool(max_concurrency=self.controller.app_state.max_concurrency) - self.worker_pool.start() - self._dispatch_thread = threading.Thread(target=self._dispatch_loop, daemon=True) - self._dispatch_thread.start() -``` - -### `_dispatch_loop` (Background Thread) - -```python -def _dispatch_loop(self) -> None: - while not self._stop_event.is_set(): - ready = self.dag_engine.tick() - for ticket_id in ready: - if self.worker_pool.has_capacity(): - self._spawn_worker(ticket_id) - time.sleep(0.1) # 100ms tick -``` - -The loop runs in a daemon thread. The main thread can call `self.stop()` to break out. - -### `_spawn_worker(ticket_id: str)` - -Spawns a worker via `mma_exec.py`: - -```python -def _spawn_worker(self, ticket_id: str) -> None: - ticket = self.current_track.get_ticket(ticket_id) - role = self.tier_assignments.get(ticket_id, "tier3-worker") - persona = self.persona_overrides.get(ticket_id) - - prompt = self._build_worker_prompt(ticket, persona) - context = self._gather_context(ticket) - - future = self.worker_pool.submit(role, prompt, context) - future.add_done_callback(lambda f: self._on_worker_done(ticket_id, f)) -``` - -The `WorkerPool` is a `ThreadPoolExecutor`-backed pool with a semaphore for concurrency control. - -### `_on_worker_done(ticket_id, future)` - -Called when a worker finishes (success or failure): - -```python -def _on_worker_done(self, ticket_id: str, future: Future) -> None: - try: - result = future.result() - self.dag_engine.dag.mark_done(ticket_id) - self.dag_engine.dag.nodes[ticket_id].result = result - self._emit_event(MMA_TICKET_COMPLETED, ticket_id, result) - except Exception as e: - self.dag_engine.dag.nodes[ticket_id].status = "blocked" - self.dag_engine.dag.nodes[ticket_id].error = str(e) - self._emit_event(MMA_TICKET_FAILED, ticket_id, str(e)) - # Re-evaluate downstream blocking - for downstream in self.dag_engine.dag.reverse_edges[ticket_id]: - self.dag_engine.dag.mark_done(downstream) -``` - -### `_emit_event(type, *args)` - -Pushes an event to the controller's `event_queue` for the GUI to consume: - -```python -def _emit_event(self, event_type: str, *args) -> None: - self.controller.event_queue.put(Event(type=event_type, args=args, timestamp=time.time())) -``` - -The GUI polls `controller.event_queue.get_all()` once per frame and dispatches to render functions. - --- -## The `WorkerPool` +## The `WorkerPool` (in `src/multi_agent_conductor.py:50-114`) -A thin wrapper around `ThreadPoolExecutor` with concurrency limiting: +A `dict[str, Thread]` + `threading.Lock` + `threading.Semaphore`, NOT a `ThreadPoolExecutor` wrapper. ```python class WorkerPool: - def __init__(self, max_concurrency: int = 4): - self.max_concurrency = max_concurrency - self.semaphore = threading.Semaphore(max_concurrency) - self.executor = ThreadPoolExecutor(max_workers=max_concurrency) - self.active_workers: set[str] = set() # ticket_ids currently running - - def submit(self, role: str, prompt: str, context: dict) -> Future: - """Submit a worker task. Returns a Future for the result.""" - def _wrapped(): - with self.semaphore: - self.active_workers.add(context["ticket_id"]) - try: - return run_mma_worker(role, prompt, context) - finally: - self.active_workers.discard(context["ticket_id"]) - return self.executor.submit(_wrapped) - - def has_capacity(self) -> bool: - return len(self.active_workers) < self.max_concurrency - - def stop(self, wait: bool = True) -> None: - self.executor.shutdown(wait=wait) + def __init__(self, max_workers: int = 4): + self.max_workers = max_workers + self._active: dict[str, threading.Thread] = {} + self._lock = threading.Lock() + self._semaphore = threading.Semaphore(max_workers) ``` -### `run_mma_worker` +**Methods** (actual signatures): -Invokes `mma_exec.py` as a subprocess (not in-process) to enforce **Context Amnesia** — the sub-agent has zero state from the parent: +| Method | Returns | Purpose | +|---|---|---| +| `spawn(ticket_id, target, args=())` | `None` | Spawns a daemon thread running `target(*args)` if the pool has capacity. No-op if full. Tracks the thread in `self._active[ticket_id]`. | +| `join_all(timeout=None)` | `None` | Joins every active thread. | +| `get_active_count()` | `int` | Returns `len(self._active)`. | +| `is_full()` | `bool` | True if `get_active_count() >= self.max_workers`. | -```python -def run_mma_worker(role: str, prompt: str, context: dict) -> dict: - """Spawn a fresh tier3-worker sub-agent for the ticket.""" - cmd = [ - sys.executable, "scripts/mma_exec.py", - "--role", role, - "--ticket-id", context["ticket_id"], - ] - # Pipe prompt via stdin - proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - payload = json.dumps({"prompt": prompt, "context": context}).encode("utf-8") - stdout, stderr = proc.communicate(payload, timeout=600) - if proc.returncode != 0: - raise RuntimeError(f"Worker failed: {stderr.decode('utf-8')}") - return json.loads(stdout.decode("utf-8")) +**Concurrency control is a `Semaphore`**, not a thread-pool executor. The semaphore is acquired in `spawn` and released in the spawned thread's `_run` wrapper. `self._active` is the source of truth for `get_active_count`/`is_full`; the semaphore prevents over-spawning. + +--- + +## Sub-Agent Invocation (`mma_exec.py`) + +The ConductorEngine does **not** spawn `mma_exec.py` directly. Sub-agent invocation is a **synchronous CLI bridge** at `scripts/mma_exec.py` invoked from a Tier 3 worker (see [conductor/workflow.md](../../conductor/workflow.md) "MMA Bridge" section). Each sub-agent is invoked via: + +```bash +uv run python scripts/mma_exec.py --role tier3-worker "[PROMPT]" ``` -This is the **Token Firewall** in action: each worker is a fresh subprocess with a clean context window, receiving only the prompt and the relevant context slice. +The `--role` flag selects between `tier1-orchestrator`, `tier2-tech-lead`, `tier3-worker`, and `tier4-qa`. Sub-agents receive context via stdin (or as additional CLI args) and exit after one round-trip. The actual prompt construction lives in `run_worker_lifecycle` at `src/multi_agent_conductor.py` (the free function referenced by both `ConductorEngine.run` and the worker spawn flow). +The "Token Firewall" effect — each worker starts with a clean context window — is achieved by the `ai_client.reset_session()` call at the start of `run_worker_lifecycle` (see [guide_mma.md](guide_mma.md) "Context Amnesia"). --- ## Track Loading