refactor(conductor): Audit and cleanup multi_agent_conductor.py
This commit is contained in:
@@ -27,34 +27,6 @@ See Also:
|
|||||||
- src/dag_engine.py for TrackDAG and ExecutionEngine
|
- src/dag_engine.py for TrackDAG and ExecutionEngine
|
||||||
- src/models.py for Ticket, Track, WorkerContext
|
- src/models.py for Ticket, Track, WorkerContext
|
||||||
"""
|
"""
|
||||||
"""
|
|
||||||
Multi-Agent Conductor - MMA 4-Tier orchestration engine.
|
|
||||||
|
|
||||||
This module provides the ConductorEngine and WorkerPool for orchestrating
|
|
||||||
the execution of implementation tickets within a Track using the DAG engine
|
|
||||||
and the bounded concurrent worker pool with abort event propagation.
|
|
||||||
|
|
||||||
Key Components:
|
|
||||||
- ConductorEngine: Tier 2 orchestrator that owns the execution loop
|
|
||||||
- WorkerPool: Bounded concurrent worker pool with semaphore gating
|
|
||||||
- run_worker_lifecycle: Stateless Tier 3 worker execution with context amnesia
|
|
||||||
|
|
||||||
Thread Safety:
|
|
||||||
- All state mutations use locks (_workers_lock, _tier_usage_lock)
|
|
||||||
- Worker threads are daemon threads that clean up on exit
|
|
||||||
- Abort events enable per-ticket cancellation
|
|
||||||
|
|
||||||
Integration:
|
|
||||||
- Uses AsyncEventQueue for state updates to the GUI
|
|
||||||
- Uses ai_client.send() for LLM communication
|
|
||||||
- Uses mcp_client for tool dispatch
|
|
||||||
|
|
||||||
See Also:
|
|
||||||
- docs/guide_mma.md for MMA orchestration documentation
|
|
||||||
- src/dag_engine.py for TrackDAG and ExecutionEngine
|
|
||||||
- src/ai_client.py for multi-provider LLM abstraction
|
|
||||||
- src/models.py for Ticket, Track, WorkerContext data structures
|
|
||||||
"""
|
|
||||||
from src import ai_client
|
from src import ai_client
|
||||||
import json
|
import json
|
||||||
import threading
|
import threading
|
||||||
@@ -151,6 +123,7 @@ class ConductorEngine:
|
|||||||
self._tier_usage_lock = threading.Lock()
|
self._tier_usage_lock = threading.Lock()
|
||||||
|
|
||||||
def update_usage(self, tier: str, input_tokens: int, output_tokens: int) -> None:
|
def update_usage(self, tier: str, input_tokens: int, output_tokens: int) -> None:
|
||||||
|
"""Updates token usage for a specific tier."""
|
||||||
with self._tier_usage_lock:
|
with self._tier_usage_lock:
|
||||||
if tier in self.tier_usage:
|
if tier in self.tier_usage:
|
||||||
self.tier_usage[tier]["input"] += input_tokens
|
self.tier_usage[tier]["input"] += input_tokens
|
||||||
@@ -180,6 +153,7 @@ class ConductorEngine:
|
|||||||
self._active_workers.pop(ticket_id, None)
|
self._active_workers.pop(ticket_id, None)
|
||||||
|
|
||||||
def _push_state(self, status: str = "running", active_tier: str = None) -> None:
|
def _push_state(self, status: str = "running", active_tier: str = None) -> None:
|
||||||
|
"""Pushes the current engine state to the GUI."""
|
||||||
if not self.event_queue:
|
if not self.event_queue:
|
||||||
return
|
return
|
||||||
payload = {
|
payload = {
|
||||||
@@ -289,7 +263,6 @@ class ConductorEngine:
|
|||||||
model_name = ticket.model_override
|
model_name = ticket.model_override
|
||||||
else:
|
else:
|
||||||
# Check if ticket has a persona with preferred_models
|
# Check if ticket has a persona with preferred_models
|
||||||
models_list = ["gemini-2.5-flash-lite", "gemini-2.5-flash", "gemini-3.1-pro-preview"]
|
|
||||||
if ticket.persona_id:
|
if ticket.persona_id:
|
||||||
# Try to load preferred_models from persona
|
# Try to load preferred_models from persona
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user