docs(reports): add SSDL Conductor Engine DAG execution loop report
This commit is contained in:
@@ -0,0 +1,123 @@
|
||||
# SSDL Report: Multi-Agent Conductor DAG Execution Loop
|
||||
|
||||
**Track/Context:** Technical Architecture Reference
|
||||
**Date:** 2026-06-13
|
||||
**Status:** Completed
|
||||
**Subject:** SSDL trace and architectural analysis of the Conductor Engine DAG execution loop.
|
||||
|
||||
---
|
||||
|
||||
## 1. Architectural Overview
|
||||
|
||||
The **Conductor Engine** ([src/multi_agent_conductor.py](file:///C:/projects/manual_slop/src/multi_agent_conductor.py)) drives the execution of tiered multi-agent tracks. It operates as a task orchestrator that parses hierarchical ticket plans, constructs a Directed Acyclic Graph (DAG) using the `TrackDAG` engine ([src/dag_engine.py](file:///C:/projects/manual_slop/src/dag_engine.py)), and ticks the execution state machine.
|
||||
|
||||
The core loop of the engine (`ConductorEngine.run`) coordinates concurrent execution of worker threads (via the `WorkerPool`), manages step-by-step human approvals, handles model escalation during retries, and transitions the track state across running, paused, blocked, and completed.
|
||||
|
||||
---
|
||||
|
||||
## 2. SSDL Topology Diagram
|
||||
|
||||
This diagram displays the execution shapes (`[I]`, `->`, `[Q]`, `[S]`, `[B]`, `[M]`, `o->`) inside the main executor loop:
|
||||
|
||||
```
|
||||
===================================================================================================
|
||||
CONDUCTOR ENGINE EXECUTION LOOP
|
||||
===================================================================================================
|
||||
|
||||
[Conductor Loop Entry]
|
||||
│
|
||||
▼
|
||||
o-> [Q:_pause_event]
|
||||
│
|
||||
├─ [B:paused?] ─── yes ───► [I:_push_state("paused")] ──► (sleep 0.5s) ──┐
|
||||
│ │
|
||||
└─ no │
|
||||
│ │
|
||||
▼ │
|
||||
[I:self.engine.tick] (recompute ready tasks) │
|
||||
│ │
|
||||
▼ │
|
||||
[B:ready_tasks empty?] │
|
||||
╱ ╲ │
|
||||
yes no │
|
||||
╱ ╲ │
|
||||
▼ ▼ │
|
||||
[B:all completed?] o-> [B:ticket.status == "todo"?] │
|
||||
╱ ╲ │ │
|
||||
yes no ▼ │
|
||||
╱ ╲ [B:pool.is_full?] ─── yes ───► (continue) ──┐ │
|
||||
▼ ▼ │ │ │
|
||||
[I:join_all] [B:in_progress?] no │ │
|
||||
│ ╱ ╲ │ │ │
|
||||
▼ yes no ▼ │ │
|
||||
[T:done] ╱ ╲ [I:resolve_model] │ │
|
||||
(sleep 1s) [T:blocked] │ │ │
|
||||
▼ │ │
|
||||
[I:build_context] │ │
|
||||
│ │ │
|
||||
▼ │ │
|
||||
[I:pool.spawn] │ │
|
||||
(run_worker_lifecycle) │ │
|
||||
│ │ │
|
||||
▼ │ │
|
||||
[S:active_workers] │ │
|
||||
[S:ticket.status = "in_progress"] │ │
|
||||
[S:event_queue.put("ticket_started")] │ │
|
||||
│ │ │
|
||||
└────────────────────────────────────┼──────────┘
|
||||
│
|
||||
▼
|
||||
(sleep 1s)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 3. Core Loop Mechanics & Transitions
|
||||
|
||||
### Step 1: Thread Synchronization & Suspension Check
|
||||
At the beginning of each iteration, the engine queries the pause synchronization flag:
|
||||
```python
|
||||
if self._pause_event.is_set():
|
||||
self._push_state(status="paused", active_tier="Paused")
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
```
|
||||
* **SSDL shape**: `o-> [Q:_pause_event] -> [B:paused?] -> [I:sleep]`
|
||||
* **Invariant**: The thread suspends operations safely without losing DAG tracking state.
|
||||
|
||||
### Step 2: DAG Ticking
|
||||
If not paused, the engine requests a list of executable tickets from the DAG engine:
|
||||
```python
|
||||
self._ready_tasks = self.engine.tick()
|
||||
```
|
||||
* **SSDL shape**: `[I:self.engine.tick] -> [Q:ready_tasks]`
|
||||
* **Details**: The underlying DAG engine analyzes dependencies, checking if parent tickets have completed.
|
||||
|
||||
### Step 3: Terminal State Analysis
|
||||
If `ready_tasks` is empty, the engine decides if the track is finished or blocked:
|
||||
1. **Completion Check**: If all tickets are in `"completed"`, it joins the worker pool and terminates:
|
||||
`[I:self.pool.join_all] -> [T:done]`
|
||||
2. **In-Progress Wait**: If some tickets are still `"in_progress"` in the pool, it sleeps 1 second and ticks again.
|
||||
3. **Blockage Check**: If no tickets are running and none are ready, the DAG is blocked (due to unresolved failures or cycle errors), and the loop exits:
|
||||
`[T:blocked]`
|
||||
|
||||
### Step 4: Worker Spawning & Escalation (Wide Codecycle)
|
||||
For each ready ticket:
|
||||
1. **Capacity Limit**: If `self.pool.is_full()` returns true, spawning is deferred to the next tick.
|
||||
2. **Model Escalation**: Resolves which model to invoke based on ticket overrides, persona defaults, and current `retry_count`. If a worker fails, its next retry escalates to a larger model (e.g. `flash-lite` -> `flash` -> `pro`):
|
||||
```python
|
||||
model_idx = min(ticket.retry_count, len(models_list) - 1)
|
||||
model_name = models_list[model_idx]
|
||||
```
|
||||
3. **Execution**: The engine spawns the worker lifecycle lifecycle thread and updates status:
|
||||
* Spawns: `run_worker_lifecycle(...)`
|
||||
* Mutates status: `ticket.status = "in_progress"`
|
||||
* Emits GUI event: `"ticket_started"`
|
||||
|
||||
---
|
||||
|
||||
## 4. Architectural Invariants
|
||||
|
||||
1. **Amnesia Principle**: Before a spawned worker calls the AI client, it executes `ai_client.reset_session()`. This prevents context bleeding and token leakages between parallel workers executing distinct tickets.
|
||||
2. **Step Mode Control**: Tickets marked with `step_mode=True` block auto-queueing and wait for manual human approval in the GUI before transitioning from `"todo"` to `"in_progress"`.
|
||||
3. **Queue Telemetry**: State mutations and status changes are pushed thread-safely to the main GUI thread via the `event_queue` helper, keeping ImGui visualization synchronized.
|
||||
Reference in New Issue
Block a user