docs(mma-conductor): rewrite ExecutionEngine/ConductorEngine/WorkerPool/mma_exec sections to match current src/multi_agent_conductor.py (predates the conductor_engine refactor)
This commit is contained in:
@@ -165,229 +165,141 @@ def get_executable_tickets(track: "Track") -> list[Ticket]:
|
|||||||
|
|
||||||
## The `ExecutionEngine`
|
## The `ExecutionEngine`
|
||||||
|
|
||||||
The execution engine handles **state machine transitions** between Auto-Queue and Step Mode.
|
---
|
||||||
|
|
||||||
### Execution Modes
|
## The `ExecutionEngine` (in `src/dag_engine.py:176-228`)
|
||||||
|
|
||||||
```python
|
`ExecutionEngine` is a **state machine facade around `TrackDAG`**, not an enum-driven auto/step dispatcher. There is no `ExecutionMode` enum; the engine takes a single `auto_queue: bool` flag.
|
||||||
class ExecutionMode(Enum):
|
|
||||||
AUTO_QUEUE = "auto_queue" # Autonomous worker spawning
|
|
||||||
STEP_MODE = "step_mode" # Explicit manual approval per transition
|
|
||||||
```
|
|
||||||
|
|
||||||
### `ExecutionEngine.tick()`
|
|
||||||
|
|
||||||
The main loop, called by the conductor at a configurable interval (default 100ms):
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
class ExecutionEngine:
|
class ExecutionEngine:
|
||||||
def __init__(self, dag: TrackDAG, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE):
|
"""A state machine that governs the progression of tasks within a TrackDAG.
|
||||||
|
Handles automatic queueing and manual task approval."""
|
||||||
|
def __init__(self, dag: TrackDAG, auto_queue: bool = False) -> None:
|
||||||
self.dag = dag
|
self.dag = dag
|
||||||
self.mode = mode
|
self.auto_queue = auto_queue
|
||||||
self.pending_approval: list[str] = [] # Ticket IDs awaiting approval in Step Mode
|
|
||||||
|
|
||||||
def tick(self) -> list[str]:
|
|
||||||
"""Returns list of ticket IDs to dispatch in this tick."""
|
|
||||||
if self.mode == ExecutionMode.AUTO_QUEUE:
|
|
||||||
return self.dag.ready_tickets()
|
|
||||||
elif self.mode == ExecutionMode.STEP_MODE:
|
|
||||||
# Only return one ticket at a time, requiring explicit approval
|
|
||||||
if self.pending_approval:
|
|
||||||
return self.pending_approval[:1]
|
|
||||||
ready = self.dag.ready_tickets()
|
|
||||||
if ready:
|
|
||||||
self.pending_approval = ready[:1]
|
|
||||||
return self.pending_approval
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Programmable Transitions
|
**Methods** (actual signatures):
|
||||||
|
|
||||||
```python
|
| Method | Returns | Purpose |
|
||||||
def set_mode(self, mode: ExecutionMode) -> None:
|
|---|---|---|
|
||||||
self.mode = mode
|
| `tick()` | `list[Ticket]` | Calls `dag.cascade_blocks()` then `dag.get_ready_tasks()`. Returns the ready list. **Does NOT auto-promote tickets to `in_progress`.** |
|
||||||
if mode == ExecutionMode.AUTO_QUEUE:
|
| `approve_task(task_id)` | `None` | Manual transition: `ticket.status` "todo" → "in_progress" IFF dependencies are met (`is_ticket_ready` returns True). |
|
||||||
self.pending_approval.clear()
|
| `update_task_status(task_id, status)` | `None` | Force-update a ticket's status (e.g. to "completed" or "blocked"). |
|
||||||
|
|
||||||
def approve_next(self) -> str | None:
|
**Step Mode / Auto-Queue** is implemented at the **caller** layer (`ConductorEngine.run` in `multi_agent_conductor.py`), not inside `ExecutionEngine`. The `auto_queue` parameter is consulted there, not in `tick()`. The ConductorEngine pushes tickets to `in_progress` based on `auto_queue` and the per-ticket `step_mode` flag.
|
||||||
"""Returns the approved ticket ID, or None if nothing pending."""
|
|
||||||
if not self.pending_approval:
|
|
||||||
return None
|
|
||||||
return self.pending_approval.pop(0)
|
|
||||||
|
|
||||||
def reject_next(self) -> str | None:
|
|
||||||
"""Returns the rejected ticket ID, marking it BLOCKED."""
|
|
||||||
if not self.pending_approval:
|
|
||||||
return None
|
|
||||||
tid = self.pending_approval.pop(0)
|
|
||||||
self.dag.nodes[tid].status = "blocked"
|
|
||||||
return tid
|
|
||||||
```
|
|
||||||
|
|
||||||
These are exposed to the GUI as "Step Mode" controls and to the Hook API as `set_execution_mode` / `approve_ticket` / `reject_ticket`.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## The `MultiAgentConductor` (in `src/multi_agent_conductor.py`)
|
## The `ConductorEngine` (in `src/multi_agent_conductor.py:116+`)
|
||||||
|
|
||||||
### `__init__(self, controller: AppController)`
|
The actual class is named `ConductorEngine`, not `MultiAgentConductor`. It owns the DAG, the engine, and a `WorkerPool`; pushes state to the GUI; and runs the main async dispatch loop.
|
||||||
|
|
||||||
|
### `__init__(track, event_queue=None, auto_queue=False, max_workers=4)`
|
||||||
|
|
||||||
```python
|
```python
|
||||||
class MultiAgentConductor:
|
class ConductorEngine:
|
||||||
def __init__(self, controller: AppController):
|
def __init__(
|
||||||
self.controller = controller
|
self,
|
||||||
self.dag_engine: ExecutionEngine | None = None
|
track: Track,
|
||||||
self.worker_pool: WorkerPool | None = None
|
event_queue: Optional[events.AsyncEventQueue] = None,
|
||||||
self.current_track: Track | None = None
|
auto_queue: bool = False,
|
||||||
self.tier_assignments: dict[str, str] = {} # ticket_id -> "tier3-worker"
|
max_workers: int = 4,
|
||||||
self.persona_overrides: dict[str, str] = {} # ticket_id -> persona_name
|
) -> None:
|
||||||
self._stop_event = threading.Event()
|
self.track = track
|
||||||
self._dispatch_thread: threading.Thread | None = None
|
self.event_queue = event_queue
|
||||||
|
self.tier_usage = {
|
||||||
|
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None},
|
||||||
|
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None},
|
||||||
|
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
||||||
|
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
||||||
|
}
|
||||||
|
self.dag = TrackDAG(self.track.tickets)
|
||||||
|
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
|
||||||
|
self.pool = WorkerPool(max_workers=max_workers)
|
||||||
|
self._workers_lock = threading.Lock()
|
||||||
|
self._active_workers: dict[str, threading.Thread] = {}
|
||||||
|
self._abort_events: dict[str, threading.Event] = {}
|
||||||
|
self._pause_event: threading.Event = threading.Event()
|
||||||
|
self._tier_usage_lock = threading.Lock()
|
||||||
|
self._dirty: bool = True
|
||||||
```
|
```
|
||||||
|
|
||||||
### `load_track(track_id: str) -> Track`
|
`max_workers` is **not** read from `config.toml` by the engine — it's a constructor parameter. The 3 call sites in `AppController` (`src/app_controller.py:4132-4133`, `4145-4146`, `4223-4224`) all read `[mma].max_workers` from TOML and pass it in. Default is 4.
|
||||||
|
|
||||||
Reads `conductor/tracks/<track_id>/plan.md` and parses the ticket list:
|
### Key methods
|
||||||
|
|
||||||
|
| Method | Returns | Purpose |
|
||||||
|
|---|---|---|
|
||||||
|
| `update_usage(tier, input_tokens, output_tokens)` | `None` | Cumulative token accounting per tier (under `_tier_usage_lock`) |
|
||||||
|
| `pause()` / `resume()` | `None` | Set/clear `_pause_event` |
|
||||||
|
| `approve_task(task_id)` | `None` | Delegates to `engine.approve_task`; sets `_dirty = True` |
|
||||||
|
| `update_task_status(task_id, status)` | `None` | Delegates to `engine.update_task_status`; sets `_dirty = True` |
|
||||||
|
| `kill_worker(ticket_id)` | `None` | Sets the per-ticket abort event; joins the thread with 1.0s timeout |
|
||||||
|
| `_push_state(status, active_tier)` | `None` | Builds a payload dict and `await event_queue.put("mma_state_update", payload)` |
|
||||||
|
| `parse_json_tickets(json_str)` | `list[dict]` | Parses Tier 2 LLM output into a list of ticket dicts (the ingestion path) |
|
||||||
|
| `run()` | (async coroutine) | The main async dispatch loop; see below |
|
||||||
|
|
||||||
|
### `run()` — The Main Async Dispatch Loop
|
||||||
|
|
||||||
|
The actual `run()` is an `async` coroutine that runs the dispatch loop, NOT a `_dispatch_loop` background thread. It uses `asyncio` and `loop.run_in_executor` to bridge to the blocking `run_worker_lifecycle` call.
|
||||||
|
|
||||||
|
### `_push_state()` payload shape
|
||||||
|
|
||||||
```python
|
```python
|
||||||
def load_track(self, track_id: str) -> Track:
|
async def _push_state(self, status: str = "running", active_tier: str = None) -> None:
|
||||||
track_dir = self.controller.paths.tracks_dir / track_id
|
if not self.event_queue:
|
||||||
plan_path = track_dir / "plan.md"
|
return
|
||||||
tickets = parse_plan_md(plan_path) # Returns list[dict]
|
payload = {
|
||||||
track = Track(id=track_id, tickets=tickets, plan_path=plan_path)
|
"status": status,
|
||||||
return track
|
"active_tier": active_tier,
|
||||||
|
"tier_usage": self.tier_usage,
|
||||||
|
"track": {"id": self.track.id, "title": self.track.description},
|
||||||
|
"tickets": [asdict(t) for t in self.track.tickets],
|
||||||
|
}
|
||||||
|
await self.event_queue.put("mma_state_update", payload)
|
||||||
```
|
```
|
||||||
|
|
||||||
The track is then passed to the DAG engine:
|
|
||||||
|
|
||||||
```python
|
|
||||||
def start(self, track: Track, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE) -> None:
|
|
||||||
self.current_track = track
|
|
||||||
self.dag_engine = ExecutionEngine(TrackDAG(track.tickets), mode)
|
|
||||||
self.worker_pool = WorkerPool(max_concurrency=self.controller.app_state.max_concurrency)
|
|
||||||
self.worker_pool.start()
|
|
||||||
self._dispatch_thread = threading.Thread(target=self._dispatch_loop, daemon=True)
|
|
||||||
self._dispatch_thread.start()
|
|
||||||
```
|
|
||||||
|
|
||||||
### `_dispatch_loop` (Background Thread)
|
|
||||||
|
|
||||||
```python
|
|
||||||
def _dispatch_loop(self) -> None:
|
|
||||||
while not self._stop_event.is_set():
|
|
||||||
ready = self.dag_engine.tick()
|
|
||||||
for ticket_id in ready:
|
|
||||||
if self.worker_pool.has_capacity():
|
|
||||||
self._spawn_worker(ticket_id)
|
|
||||||
time.sleep(0.1) # 100ms tick
|
|
||||||
```
|
|
||||||
|
|
||||||
The loop runs in a daemon thread. The main thread can call `self.stop()` to break out.
|
|
||||||
|
|
||||||
### `_spawn_worker(ticket_id: str)`
|
|
||||||
|
|
||||||
Spawns a worker via `mma_exec.py`:
|
|
||||||
|
|
||||||
```python
|
|
||||||
def _spawn_worker(self, ticket_id: str) -> None:
|
|
||||||
ticket = self.current_track.get_ticket(ticket_id)
|
|
||||||
role = self.tier_assignments.get(ticket_id, "tier3-worker")
|
|
||||||
persona = self.persona_overrides.get(ticket_id)
|
|
||||||
|
|
||||||
prompt = self._build_worker_prompt(ticket, persona)
|
|
||||||
context = self._gather_context(ticket)
|
|
||||||
|
|
||||||
future = self.worker_pool.submit(role, prompt, context)
|
|
||||||
future.add_done_callback(lambda f: self._on_worker_done(ticket_id, f))
|
|
||||||
```
|
|
||||||
|
|
||||||
The `WorkerPool` is a `ThreadPoolExecutor`-backed pool with a semaphore for concurrency control.
|
|
||||||
|
|
||||||
### `_on_worker_done(ticket_id, future)`
|
|
||||||
|
|
||||||
Called when a worker finishes (success or failure):
|
|
||||||
|
|
||||||
```python
|
|
||||||
def _on_worker_done(self, ticket_id: str, future: Future) -> None:
|
|
||||||
try:
|
|
||||||
result = future.result()
|
|
||||||
self.dag_engine.dag.mark_done(ticket_id)
|
|
||||||
self.dag_engine.dag.nodes[ticket_id].result = result
|
|
||||||
self._emit_event(MMA_TICKET_COMPLETED, ticket_id, result)
|
|
||||||
except Exception as e:
|
|
||||||
self.dag_engine.dag.nodes[ticket_id].status = "blocked"
|
|
||||||
self.dag_engine.dag.nodes[ticket_id].error = str(e)
|
|
||||||
self._emit_event(MMA_TICKET_FAILED, ticket_id, str(e))
|
|
||||||
# Re-evaluate downstream blocking
|
|
||||||
for downstream in self.dag_engine.dag.reverse_edges[ticket_id]:
|
|
||||||
self.dag_engine.dag.mark_done(downstream)
|
|
||||||
```
|
|
||||||
|
|
||||||
### `_emit_event(type, *args)`
|
|
||||||
|
|
||||||
Pushes an event to the controller's `event_queue` for the GUI to consume:
|
|
||||||
|
|
||||||
```python
|
|
||||||
def _emit_event(self, event_type: str, *args) -> None:
|
|
||||||
self.controller.event_queue.put(Event(type=event_type, args=args, timestamp=time.time()))
|
|
||||||
```
|
|
||||||
|
|
||||||
The GUI polls `controller.event_queue.get_all()` once per frame and dispatches to render functions.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## The `WorkerPool`
|
## The `WorkerPool` (in `src/multi_agent_conductor.py:50-114`)
|
||||||
|
|
||||||
A thin wrapper around `ThreadPoolExecutor` with concurrency limiting:
|
A `dict[str, Thread]` + `threading.Lock` + `threading.Semaphore`, NOT a `ThreadPoolExecutor` wrapper.
|
||||||
|
|
||||||
```python
|
```python
|
||||||
class WorkerPool:
|
class WorkerPool:
|
||||||
def __init__(self, max_concurrency: int = 4):
|
def __init__(self, max_workers: int = 4):
|
||||||
self.max_concurrency = max_concurrency
|
self.max_workers = max_workers
|
||||||
self.semaphore = threading.Semaphore(max_concurrency)
|
self._active: dict[str, threading.Thread] = {}
|
||||||
self.executor = ThreadPoolExecutor(max_workers=max_concurrency)
|
self._lock = threading.Lock()
|
||||||
self.active_workers: set[str] = set() # ticket_ids currently running
|
self._semaphore = threading.Semaphore(max_workers)
|
||||||
|
|
||||||
def submit(self, role: str, prompt: str, context: dict) -> Future:
|
|
||||||
"""Submit a worker task. Returns a Future for the result."""
|
|
||||||
def _wrapped():
|
|
||||||
with self.semaphore:
|
|
||||||
self.active_workers.add(context["ticket_id"])
|
|
||||||
try:
|
|
||||||
return run_mma_worker(role, prompt, context)
|
|
||||||
finally:
|
|
||||||
self.active_workers.discard(context["ticket_id"])
|
|
||||||
return self.executor.submit(_wrapped)
|
|
||||||
|
|
||||||
def has_capacity(self) -> bool:
|
|
||||||
return len(self.active_workers) < self.max_concurrency
|
|
||||||
|
|
||||||
def stop(self, wait: bool = True) -> None:
|
|
||||||
self.executor.shutdown(wait=wait)
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### `run_mma_worker`
|
**Methods** (actual signatures):
|
||||||
|
|
||||||
Invokes `mma_exec.py` as a subprocess (not in-process) to enforce **Context Amnesia** — the sub-agent has zero state from the parent:
|
| Method | Returns | Purpose |
|
||||||
|
|---|---|---|
|
||||||
|
| `spawn(ticket_id, target, args=())` | `None` | Spawns a daemon thread running `target(*args)` if the pool has capacity. No-op if full. Tracks the thread in `self._active[ticket_id]`. |
|
||||||
|
| `join_all(timeout=None)` | `None` | Joins every active thread. |
|
||||||
|
| `get_active_count()` | `int` | Returns `len(self._active)`. |
|
||||||
|
| `is_full()` | `bool` | True if `get_active_count() >= self.max_workers`. |
|
||||||
|
|
||||||
```python
|
**Concurrency control is a `Semaphore`**, not a thread-pool executor. The semaphore is acquired in `spawn` and released in the spawned thread's `_run` wrapper. `self._active` is the source of truth for `get_active_count`/`is_full`; the semaphore prevents over-spawning.
|
||||||
def run_mma_worker(role: str, prompt: str, context: dict) -> dict:
|
|
||||||
"""Spawn a fresh tier3-worker sub-agent for the ticket."""
|
---
|
||||||
cmd = [
|
|
||||||
sys.executable, "scripts/mma_exec.py",
|
## Sub-Agent Invocation (`mma_exec.py`)
|
||||||
"--role", role,
|
|
||||||
"--ticket-id", context["ticket_id"],
|
The ConductorEngine does **not** spawn `mma_exec.py` directly. Sub-agent invocation is a **synchronous CLI bridge** at `scripts/mma_exec.py` invoked from a Tier 3 worker (see [conductor/workflow.md](../../conductor/workflow.md) "MMA Bridge" section). Each sub-agent is invoked via:
|
||||||
]
|
|
||||||
# Pipe prompt via stdin
|
```bash
|
||||||
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
uv run python scripts/mma_exec.py --role tier3-worker "[PROMPT]"
|
||||||
payload = json.dumps({"prompt": prompt, "context": context}).encode("utf-8")
|
|
||||||
stdout, stderr = proc.communicate(payload, timeout=600)
|
|
||||||
if proc.returncode != 0:
|
|
||||||
raise RuntimeError(f"Worker failed: {stderr.decode('utf-8')}")
|
|
||||||
return json.loads(stdout.decode("utf-8"))
|
|
||||||
```
|
```
|
||||||
|
|
||||||
This is the **Token Firewall** in action: each worker is a fresh subprocess with a clean context window, receiving only the prompt and the relevant context slice.
|
The `--role` flag selects between `tier1-orchestrator`, `tier2-tech-lead`, `tier3-worker`, and `tier4-qa`. Sub-agents receive context via stdin (or as additional CLI args) and exit after one round-trip. The actual prompt construction lives in `run_worker_lifecycle` at `src/multi_agent_conductor.py` (the free function referenced by both `ConductorEngine.run` and the worker spawn flow).
|
||||||
|
|
||||||
|
The "Token Firewall" effect — each worker starts with a clean context window — is achieved by the `ai_client.reset_session()` call at the start of `run_worker_lifecycle` (see [guide_mma.md](guide_mma.md) "Context Amnesia").
|
||||||
---
|
---
|
||||||
|
|
||||||
## Track Loading
|
## Track Loading
|
||||||
|
|||||||
Reference in New Issue
Block a user