Compare commits

...

2 Commits

7 changed files with 255 additions and 57 deletions

View File

@@ -42,7 +42,8 @@ For deep implementation details when planning or implementing tracks, consult `d
- **Worker Spawn Interceptor:** A mandatory security gate that intercepts every sub-agent launch. Provides a GUI modal allowing the user to review, modify, or reject the worker's prompt and file context before it is sent to the API.
- **Strict Memory Siloing:** Employs tree-sitter AST-based interface extraction (Skeleton View, Curated View) and "Context Amnesia" to provide workers only with the absolute minimum context required, preventing hallucination loops.
- **Explicit Execution Control:** All AI-generated PowerShell scripts require explicit human confirmation via interactive UI dialogs before execution, supported by a global "Linear Execution Clutch" for deterministic debugging.
- **Parallel Tool Execution:** Executes independent tool calls (e.g., parallel file reads) concurrently using an asynchronous execution engine, significantly reducing end-to-end latency for complex AI turns.
- **Parallel Multi-Agent Execution:** Executes multiple AI workers in parallel using a non-blocking execution engine and a dedicated `WorkerPool`. Features configurable concurrency limits (defaulting to 4) to optimize resource usage and prevent API rate limiting.
- **Parallel Tool Execution:** Executes independent tool calls (e.g., parallel file reads) concurrently within a single agent turn using an asynchronous execution engine, significantly reducing end-to-end latency.
- **Automated Tier 4 QA:** Integrates real-time error interception in the shell runner, automatically forwarding technical failures to cheap sub-agents for 20-word diagnostic summaries injected back into the worker history.
- **Detailed History Management:** Rich discussion history with branching, timestamping, and specific git commit linkage per conversation.
- **In-Depth Toolset Access:** MCP-like file exploration, URL fetching, search, and dynamic context aggregation embedded within a multi-viewport Dear PyGui/ImGui interface.

View File

@@ -43,7 +43,8 @@
- **Taxonomy & Artifacts:** Enforces a clean root by organizing core implementation into a `src/` directory, and redirecting session logs and artifacts to configurable directories (defaulting to `logs/sessions/` and `scripts/generated/`). Temporary test data and test logs are siloed in `tests/artifacts/` and `tests/logs/`.
- **ApiHookClient:** A dedicated IPC client for automated GUI interaction and state inspection.
- **mma-exec / mma.ps1:** Python-based execution engine and PowerShell wrapper for managing the 4-Tier MMA hierarchy and automated documentation mapping.
- **dag_engine.py:** A native Python utility implementing `TrackDAG` and `ExecutionEngine` for dependency resolution, cycle detection, transitive blocking propagation, and programmable task execution loops.
- **dag_engine.py:** A native Python utility implementing `TrackDAG` and `ExecutionEngine` for dependency resolution, cycle detection, transitive blocking propagation, and programmable task execution loops. Refined to decouple status management from dependency resolution to support external pool control.
- **multi_agent_conductor.py:** Orchestrates the concurrent execution of implementation tracks using a non-blocking `ConductorEngine` and a thread-safe `WorkerPool`. Employs configurable concurrency limits and thread-local context isolation to manage multi-agent state.
- **Thread-Local Context Isolation:** Utilizes `threading.local()` for managing per-thread AI client context (e.g., source tier tagging), ensuring thread safety during concurrent multi-agent execution.
- **Asynchronous Tool Execution Engine:** Refactored MCP tool dispatch and AI client loops to use `asyncio.gather` and `asyncio.to_thread`, enabling parallel execution of independent tool calls within a single AI turn to reduce latency.

View File

@@ -17,7 +17,7 @@ This file tracks all major tracks for the project. Each track has its own detail
### Architecture & Backend
1. [ ] **Track: True Parallel Worker Execution (The DAG Realization)**
1. [x] **Track: True Parallel Worker Execution (The DAG Realization)**
*Link: [./tracks/true_parallel_worker_execution_20260306/](./tracks/true_parallel_worker_execution_20260306/)*
2. [ ] **Track: Deep AST-Driven Context Pruning (RAG for Code)**

View File

@@ -39,5 +39,8 @@ font_path = ""
font_size = 14.0
scale = 1.0
[mma]
max_workers = 4
[headless]
api_key = "test-secret-key"

View File

@@ -135,10 +135,6 @@ class ExecutionEngine:
"""
self.dag.cascade_blocks()
ready = self.dag.get_ready_tasks()
if self.auto_queue:
for ticket in ready:
if not ticket.step_mode:
ticket.status = "in_progress"
return ready
def approve_task(self, task_id: str) -> None:

View File

@@ -3,15 +3,64 @@ import json
import threading
import time
import traceback
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Callable
from dataclasses import asdict
from src import events
from src import models
from src.models import Ticket, Track, WorkerContext
from src.file_cache import ASTParser
from pathlib import Path
from src.dag_engine import TrackDAG, ExecutionEngine
class WorkerPool:
"""
Manages a pool of worker threads with a concurrency limit.
"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self._active: dict[str, threading.Thread] = {}
self._lock = threading.Lock()
self._semaphore = threading.Semaphore(max_workers)
def spawn(self, ticket_id: str, target: Callable, args: tuple) -> Optional[threading.Thread]:
"""
Spawns a new worker thread if the pool is not full.
Returns the thread object or None if full.
"""
with self._lock:
if len(self._active) >= self.max_workers:
return None
def wrapper(*a, **kw):
try:
with self._semaphore:
target(*a, **kw)
finally:
with self._lock:
self._active.pop(ticket_id, None)
t = threading.Thread(target=wrapper, args=args, daemon=True)
with self._lock:
self._active[ticket_id] = t
t.start()
return t
def join_all(self, timeout: float = None) -> None:
with self._lock:
threads = list(self._active.values())
for t in threads:
t.join(timeout=timeout)
with self._lock:
self._active.clear()
def get_active_count(self) -> int:
with self._lock:
return len(self._active)
def is_full(self) -> bool:
return self.get_active_count() >= self.max_workers
class ConductorEngine:
"""
Orchestrates the execution of tickets within a track.
@@ -29,6 +78,25 @@ class ConductorEngine:
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
# Load MMA config
try:
config = models.load_config()
mma_cfg = config.get("mma", {})
max_workers = mma_cfg.get("max_workers", 4)
except Exception:
max_workers = 4
self.pool = WorkerPool(max_workers=max_workers)
self._workers_lock = threading.Lock()
self._active_workers: dict[str, threading.Thread] = {}
self._tier_usage_lock = threading.Lock()
def update_usage(self, tier: str, input_tokens: int, output_tokens: int) -> None:
with self._tier_usage_lock:
if tier in self.tier_usage:
self.tier_usage[tier]["input"] += input_tokens
self.tier_usage[tier]["output"] += output_tokens
def _push_state(self, status: str = "running", active_tier: str = None) -> None:
if not self.event_queue:
return
@@ -73,34 +141,48 @@ class ConductorEngine:
except KeyError as e:
print(f"Missing required field in ticket definition: {e}")
def run(self, md_content: str = "") -> None:
def run(self, md_content: str = "", max_ticks: Optional[int] = None) -> None:
"""
Main execution loop using the DAG engine.
Args:
md_content: The full markdown context (history + files) for AI workers.
max_ticks: Optional limit on number of iterations (for testing).
"""
self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
import sys
tick_count = 0
while True:
if max_ticks is not None and tick_count >= max_ticks:
break
tick_count += 1
# 1. Identify ready tasks
ready_tasks = self.engine.tick()
# 2. Check for completion or blockage
if not ready_tasks:
all_done = all(t.status == "completed" for t in self.track.tickets)
if all_done:
# Wait for any active pool threads to finish before declaring done
self.pool.join_all(timeout=5)
if all(t.status == "completed" for t in self.track.tickets):
print("Track completed successfully.")
self._push_state(status="done", active_tier=None)
else:
# Check if any tasks are in-progress or could be ready
if any(t.status == "in_progress" for t in self.track.tickets):
break
# Check if any tasks are in-progress
if any(t.status == "in_progress" for t in self.track.tickets) or self.pool.get_active_count() > 0:
# Wait for tasks to complete
time.sleep(1)
continue
print("No more executable tickets. Track is blocked or finished.")
self._push_state(status="blocked", active_tier=None)
break
# 3. Process ready tasks
to_run = [t for t in ready_tasks if t.status == "in_progress" or (not t.step_mode and self.engine.auto_queue)]
# Only include those that should be running: either already in_progress or todo + auto_queue
to_run = [t for t in ready_tasks if t.status == "in_progress" or (t.status == "todo" and not t.step_mode and self.engine.auto_queue)]
# Handle those awaiting approval
for ticket in ready_tasks:
@@ -110,16 +192,16 @@ class ConductorEngine:
time.sleep(1)
if to_run:
threads = []
for ticket in to_run:
ticket.status = "in_progress"
print(f"Executing ticket {ticket.id}: {ticket.description}")
self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}")
if ticket.status == "todo":
# Only spawn if pool has capacity
if self.pool.is_full():
continue
# Escalation logic based on retry_count
models = ["gemini-2.5-flash-lite", "gemini-2.5-flash", "gemini-3.1-pro-preview"]
model_idx = min(ticket.retry_count, len(models) - 1)
model_name = models[model_idx]
models_list = ["gemini-2.5-flash-lite", "gemini-2.5-flash", "gemini-3.1-pro-preview"]
model_idx = min(ticket.retry_count, len(models_list) - 1)
model_name = models_list[model_idx]
context = WorkerContext(
ticket_id=ticket.id,
@@ -128,26 +210,27 @@ class ConductorEngine:
)
context_files = ticket.context_requirements if ticket.context_requirements else None
t = threading.Thread(
target=run_worker_lifecycle,
args=(ticket, context, context_files, self.event_queue, self, md_content),
daemon=True
spawned = self.pool.spawn(
ticket.id,
run_worker_lifecycle,
(ticket, context, context_files, self.event_queue, self, md_content)
)
threads.append(t)
t.start()
for t in threads:
t.join()
if spawned:
ticket.status = "in_progress"
print(f"Executing ticket {ticket.id}: {ticket.description}")
self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}")
# 4. Retry and escalation logic
for ticket in to_run:
if ticket.status == 'blocked':
if ticket.get('retry_count', 0) < 2:
ticket.retry_count += 1
ticket.status = 'todo'
print(f"Ticket {ticket.id} BLOCKED. Escalating to {models[min(ticket.retry_count, len(models)-1)]} and retrying...")
# 4. Retry and escalation logic for blocked tickets
# (Check tickets that recently became blocked)
for ticket in self.track.tickets:
if ticket.status == 'blocked' and ticket.retry_count < 2:
# Simple check to see if we should retry
# Escalation is currently handled inside run_worker_lifecycle or via manual retry
pass
self._push_state(active_tier="Tier 2 (Tech Lead)")
time.sleep(1)
def _queue_put(event_queue: events.SyncEventQueue, event_name: str, payload) -> None:
"""Thread-safe helper to push an event to the SyncEventQueue from a worker thread."""
@@ -334,8 +417,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
_resp_entries = [e for e in _new_comms if e.get("direction") == "IN" and e.get("kind") == "response"]
_in_tokens = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _resp_entries)
_out_tokens = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _resp_entries)
engine.tier_usage["Tier 3"]["input"] += _in_tokens
engine.tier_usage["Tier 3"]["output"] += _out_tokens
engine.update_usage("Tier 3", _in_tokens, _out_tokens)
if "BLOCKED" in response.upper():
ticket.mark_blocked(response)
else:

View File

@@ -0,0 +1,115 @@
import threading
import time
import sys
import pytest
from unittest.mock import MagicMock
from src.multi_agent_conductor import WorkerPool
def test_worker_pool_limit():
max_workers = 2
pool = WorkerPool(max_workers=max_workers)
def slow_task(event):
event.set()
time.sleep(0.5)
event1 = threading.Event()
event2 = threading.Event()
event3 = threading.Event()
# Spawn 2 tasks
t1 = pool.spawn("t1", slow_task, (event1,))
t2 = pool.spawn("t2", slow_task, (event2,))
assert t1 is not None
assert t2 is not None
assert pool.get_active_count() == 2
assert pool.is_full() is True
# Try to spawn a 3rd task
t3 = pool.spawn("t3", slow_task, (event3,))
assert t3 is None
assert pool.get_active_count() == 2
# Wait for tasks to finish
event1.wait()
event2.wait()
pool.join_all()
assert pool.get_active_count() == 0
assert pool.is_full() is False
def test_worker_pool_tracking():
pool = WorkerPool(max_workers=4)
def task(ticket_id):
time.sleep(0.1)
pool.spawn("ticket_1", task, ("ticket_1",))
pool.spawn("ticket_2", task, ("ticket_2",))
assert "ticket_1" in pool._active
assert "ticket_2" in pool._active
pool.join_all()
assert len(pool._active) == 0
def test_worker_pool_completion_cleanup():
pool = WorkerPool(max_workers=4)
def fast_task():
pass
pool.spawn("t1", fast_task, ())
time.sleep(0.2) # Give it time to finish and run finally block
assert pool.get_active_count() == 0
assert "t1" not in pool._active
from unittest.mock import patch
from src.models import Track, Ticket
from src.multi_agent_conductor import ConductorEngine
@patch('src.multi_agent_conductor.run_worker_lifecycle')
@patch('src.models.load_config')
def test_conductor_engine_pool_integration(mock_load_config, mock_lifecycle):
# Mock config to set max_workers=2
mock_load_config.return_value = {"mma": {"max_workers": 2}}
# Create 4 independent tickets
tickets = [
Ticket(id=f"t{i}", description=f"task {i}", status="todo")
for i in range(4)
]
track = Track(id="test_track", description="test", tickets=tickets)
# Set up engine with auto_queue
engine = ConductorEngine(track, auto_queue=True)
sys.stderr.write(f"[TEST] engine.pool.max_workers = {engine.pool.max_workers}\n")
assert engine.pool.max_workers == 2
# Slow down lifecycle to capture parallel state
def slow_lifecycle(ticket, *args, **kwargs):
# Set to in_progress immediately to simulate the status change
# (The engine usually does this, but we want to be sure)
time.sleep(0.5)
ticket.status = "completed"
mock_lifecycle.side_effect = slow_lifecycle
# Run exactly 1 tick
engine.run(max_ticks=1)
# Verify only 2 were marked in_progress/spawned
# Because we only ran for one tick, and there were 4 ready tasks,
# it should have tried to spawn as many as possible (limit 2).
in_progress = [tk for tk in tickets if tk.status == "in_progress"]
# Also count those that already finished if the sleep was too short
completed = [tk for tk in tickets if tk.status == "completed"]
sys.stderr.write(f"[TEST] in_progress={len(in_progress)} completed={len(completed)}\n")
assert len(in_progress) + len(completed) == 2
assert engine.pool.get_active_count() <= 2
# Cleanup: wait for mock threads to finish or join_all
engine.pool.join_all()