b043d06771
This fixes the 'stuck' behavior in concurrent tests by ensuring the tests look for standard completion markers and don't wait for unnecessary timeouts.
613 lines
22 KiB
Python
613 lines
22 KiB
Python
"""
|
|
Multi-Agent Conductor - 4-tier MMA orchestration engine.
|
|
|
|
This module implements the ConductorEngine and WorkerPool for executing
|
|
implementation tracks via the 4-tier Multi-Model Agent (MMA) hierarchy.
|
|
|
|
Key Components:
|
|
- WorkerPool: Bounded concurrent worker pool with semaphore gating
|
|
- ConductorEngine: Main orchestration loop with DAG execution
|
|
- run_worker_lifecycle: Tier 3 worker execution function
|
|
|
|
Architecture Integration:
|
|
- Uses TrackDAG and ExecutionEngine from dag_engine.py
|
|
- Communicates with GUI via AsyncEventQueue
|
|
- Manages tier-specific token usage via update_usage()
|
|
|
|
Thread Safety:
|
|
- WorkerPool uses threading.Lock for all state mutations
|
|
- ConductorEngine uses _tier_usage_lock for tier usage tracking
|
|
- Abort events use threading.Event for worker cancellation
|
|
|
|
Configuration:
|
|
- max_workers: Loaded from config.toml [mma].max_workers (default: 4)
|
|
|
|
See Also:
|
|
- docs/guide_mma.md for full MMA documentation
|
|
- src/dag_engine.py for TrackDAG and ExecutionEngine
|
|
- src/models.py for Ticket, Track, WorkerContext
|
|
"""
|
|
from src import ai_client
|
|
from src import summarize
|
|
import json
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from typing import List, Optional, Tuple, Callable
|
|
from dataclasses import asdict
|
|
from src import events
|
|
from src import models
|
|
from src.models import Ticket, Track, WorkerContext
|
|
from src.file_cache import ASTParser
|
|
from pathlib import Path
|
|
from src.personas import PersonaManager
|
|
from src import paths
|
|
|
|
from src.dag_engine import TrackDAG, ExecutionEngine
|
|
|
|
class WorkerPool:
|
|
"""
|
|
Manages a pool of worker threads with a concurrency limit.
|
|
"""
|
|
def __init__(self, max_workers: int = 4):
|
|
self.max_workers = max_workers
|
|
self._active: dict[str, threading.Thread] = {}
|
|
self._lock = threading.Lock()
|
|
self._semaphore = threading.Semaphore(max_workers)
|
|
|
|
def spawn(self, ticket_id: str, target: Callable, args: tuple) -> Optional[threading.Thread]:
|
|
"""
|
|
Spawns a new worker thread if the pool is not full.
|
|
Returns the thread object or None if full.
|
|
"""
|
|
with self._lock:
|
|
if len(self._active) >= self.max_workers:
|
|
return None
|
|
|
|
def wrapper(*a, **kw):
|
|
try:
|
|
with self._semaphore:
|
|
target(*a, **kw)
|
|
finally:
|
|
with self._lock:
|
|
self._active.pop(ticket_id, None)
|
|
|
|
t = threading.Thread(target=wrapper, args=args, daemon=True)
|
|
with self._lock:
|
|
self._active[ticket_id] = t
|
|
t.start()
|
|
return t
|
|
|
|
def join_all(self, timeout: float = None) -> None:
|
|
with self._lock:
|
|
threads = list(self._active.values())
|
|
for t in threads:
|
|
t.join(timeout=timeout)
|
|
with self._lock:
|
|
self._active.clear()
|
|
|
|
def get_active_count(self) -> int:
|
|
with self._lock:
|
|
return len(self._active)
|
|
|
|
def is_full(self) -> bool:
|
|
return self.get_active_count() >= self.max_workers
|
|
|
|
class ConductorEngine:
|
|
"""
|
|
Orchestrates the execution of tickets within a track.
|
|
"""
|
|
|
|
def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False) -> None:
|
|
self.track = track
|
|
self.event_queue = event_queue
|
|
self.tier_usage = {
|
|
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None},
|
|
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None},
|
|
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
|
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
|
}
|
|
self.dag = TrackDAG(self.track.tickets)
|
|
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
|
|
|
|
# Load MMA config
|
|
try:
|
|
config = models.load_config()
|
|
mma_cfg = config.get("mma", {})
|
|
max_workers = mma_cfg.get("max_workers", 4)
|
|
except Exception:
|
|
max_workers = 4
|
|
|
|
self.pool = WorkerPool(max_workers=max_workers)
|
|
self._workers_lock = threading.Lock()
|
|
self._active_workers: dict[str, threading.Thread] = {}
|
|
self._abort_events: dict[str, threading.Event] = {}
|
|
self._pause_event: threading.Event = threading.Event()
|
|
self._tier_usage_lock = threading.Lock()
|
|
self._dirty: bool = True
|
|
|
|
def update_usage(self, tier: str, input_tokens: int, output_tokens: int) -> None:
|
|
"""Updates token usage for a specific tier."""
|
|
with self._tier_usage_lock:
|
|
if tier in self.tier_usage:
|
|
self.tier_usage[tier]["input"] += input_tokens
|
|
self.tier_usage[tier]["output"] += output_tokens
|
|
|
|
def pause(self) -> None:
|
|
"""Pauses the pipeline execution."""
|
|
self._pause_event.set()
|
|
|
|
def resume(self) -> None:
|
|
"""Resumes the pipeline execution."""
|
|
self._pause_event.clear()
|
|
|
|
def approve_task(self, task_id: str) -> None:
|
|
"""Manually transition todo to in_progress and mark engine dirty."""
|
|
self.engine.approve_task(task_id)
|
|
self._dirty = True
|
|
|
|
def update_task_status(self, task_id: str, status: str) -> None:
|
|
"""Force-update ticket status and mark engine dirty."""
|
|
self.engine.update_task_status(task_id, status)
|
|
self._dirty = True
|
|
|
|
def kill_worker(self, ticket_id: str) -> None:
|
|
"""Sets the abort event for a worker and attempts to join its thread."""
|
|
if ticket_id in self._abort_events:
|
|
print(f"[MMA] Setting abort event for {ticket_id}")
|
|
self._abort_events[ticket_id].set()
|
|
|
|
with self._workers_lock:
|
|
thread = self._active_workers.get(ticket_id)
|
|
|
|
if thread:
|
|
print(f"[MMA] Joining thread for {ticket_id}")
|
|
thread.join(timeout=1.0)
|
|
with self._workers_lock:
|
|
self._active_workers.pop(ticket_id, None)
|
|
|
|
def _push_state(self, status: str = "running", active_tier: str = None) -> None:
|
|
"""Pushes the current engine state to the GUI."""
|
|
if not self.event_queue:
|
|
return
|
|
payload = {
|
|
"status": status,
|
|
"active_tier": active_tier,
|
|
"tier_usage": self.tier_usage,
|
|
"track": {
|
|
"id": self.track.id,
|
|
"title": self.track.description,
|
|
},
|
|
"tickets": [asdict(t) for t in self.track.tickets]
|
|
}
|
|
self.event_queue.put("mma_state_update", payload)
|
|
|
|
def parse_json_tickets(self, json_str: str) -> None:
|
|
"""
|
|
Parses a JSON string of ticket definitions (Godot ECS Flat List format)
|
|
and populates the Track's ticket list.
|
|
"""
|
|
try:
|
|
data = json.loads(json_str)
|
|
if not isinstance(data, list):
|
|
print("Error: JSON input must be a list of ticket definitions.")
|
|
return
|
|
for ticket_data in data:
|
|
# Construct Ticket object, using defaults for optional fields
|
|
ticket = Ticket(
|
|
id=ticket_data["id"],
|
|
description=ticket_data["description"],
|
|
status=ticket_data.get("status", "todo"),
|
|
assigned_to=ticket_data.get("assigned_to", "unassigned"),
|
|
depends_on=ticket_data.get("depends_on", []),
|
|
step_mode=ticket_data.get("step_mode", False)
|
|
)
|
|
self.track.tickets.append(ticket)
|
|
# Rebuild DAG and Engine after parsing new tickets
|
|
self.dag = TrackDAG(self.track.tickets)
|
|
self.engine = ExecutionEngine(self.dag, auto_queue=self.engine.auto_queue)
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error parsing JSON tickets: {e}")
|
|
except KeyError as e:
|
|
print(f"Missing required field in ticket definition: {e}")
|
|
|
|
def run(self, md_content: str = "", max_ticks: Optional[int] = None) -> None:
|
|
"""
|
|
Main execution loop using the DAG engine.
|
|
Args:
|
|
md_content: The full markdown context (history + files) for AI workers.
|
|
max_ticks: Optional limit on number of iterations (for testing).
|
|
"""
|
|
tick_count = 0
|
|
while True:
|
|
if self._pause_event.is_set():
|
|
self._push_state(status="paused", active_tier="Paused")
|
|
time.sleep(0.5)
|
|
continue
|
|
self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
|
|
if max_ticks is not None and tick_count >= max_ticks:
|
|
break
|
|
tick_count += 1
|
|
# 1. Identify ready tasks - always recompute to ensure fresh state
|
|
self._ready_tasks = self.engine.tick()
|
|
ready_tasks = self._ready_tasks
|
|
|
|
# 2. Check for completion or blockage
|
|
|
|
if not ready_tasks:
|
|
all_done = all(t.status == "completed" for t in self.track.tickets)
|
|
if all_done:
|
|
# Wait for any active pool threads to finish before declaring done
|
|
self.pool.join_all(timeout=5)
|
|
if all(t.status == "completed" for t in self.track.tickets):
|
|
print("Track completed successfully.")
|
|
self._push_state(status="done", active_tier=None)
|
|
break
|
|
|
|
# Check if any tasks are in-progress
|
|
if any(t.status == "in_progress" for t in self.track.tickets) or self.pool.get_active_count() > 0:
|
|
# Wait for tasks to complete
|
|
time.sleep(1)
|
|
continue
|
|
|
|
print("No more executable tickets. Track is blocked or finished.")
|
|
self._push_state(status="blocked", active_tier=None)
|
|
break
|
|
|
|
# 3. Process ready tasks
|
|
# Only include those that should be running: either already in_progress or todo + auto_queue
|
|
to_run = [t for t in ready_tasks if t.status == "in_progress" or (t.status == "todo" and not t.step_mode and self.engine.auto_queue)]
|
|
|
|
# Handle those awaiting approval
|
|
for ticket in ready_tasks:
|
|
if ticket not in to_run and ticket.status == "todo":
|
|
print(f"Ticket {ticket.id} is ready and awaiting approval.")
|
|
self._push_state(active_tier=f"Awaiting Approval: {ticket.id}")
|
|
time.sleep(1)
|
|
|
|
if to_run:
|
|
for ticket in to_run:
|
|
if ticket.status == "todo":
|
|
# Only spawn if pool has capacity
|
|
if self.pool.is_full():
|
|
continue
|
|
|
|
# Escalation logic based on retry_count
|
|
models_list = ["gemini-2.5-flash-lite", "gemini-2.5-flash", "gemini-3.1-pro-preview"]
|
|
if ticket.model_override:
|
|
model_name = ticket.model_override
|
|
else:
|
|
# Check if ticket has a persona with preferred_models
|
|
if ticket.persona_id:
|
|
try:
|
|
pm = PersonaManager(Path(paths.get_project_personas_path(Path.cwd())) if paths.get_project_personas_path(Path.cwd()).exists() else None)
|
|
personas = pm.load_all()
|
|
if ticket.persona_id in personas:
|
|
persona = personas[ticket.persona_id]
|
|
if persona.preferred_models:
|
|
models_list = persona.preferred_models
|
|
except:
|
|
pass # Fall back to default list
|
|
model_idx = min(ticket.retry_count, len(models_list) - 1)
|
|
model_name = models_list[model_idx]
|
|
|
|
context = WorkerContext(
|
|
ticket_id=ticket.id,
|
|
model_name=model_name,
|
|
messages=[],
|
|
tool_preset=self.tier_usage["Tier 3"]["tool_preset"],
|
|
persona_id=ticket.persona_id or self.tier_usage["Tier 3"].get("persona")
|
|
)
|
|
context_files = ticket.context_requirements if ticket.context_requirements else None
|
|
|
|
# Initialize abort event before spawning
|
|
self._abort_events[ticket.id] = threading.Event()
|
|
|
|
spawned = self.pool.spawn(
|
|
ticket.id,
|
|
run_worker_lifecycle,
|
|
(ticket, context, context_files, self.event_queue, self, md_content)
|
|
)
|
|
|
|
if spawned:
|
|
with self._workers_lock:
|
|
self._active_workers[ticket.id] = spawned
|
|
ticket.status = "in_progress"
|
|
_queue_put(self.event_queue, "ticket_started", {"ticket_id": ticket.id, "timestamp": time.time(), "persona_id": context.persona_id, "model": model_name})
|
|
print(f"Executing ticket {ticket.id}: {ticket.description}")
|
|
self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}")
|
|
|
|
# 4. Retry and escalation logic for blocked tickets
|
|
# (Check tickets that recently became blocked)
|
|
for ticket in self.track.tickets:
|
|
if ticket.status == 'blocked' and ticket.retry_count < 2:
|
|
# Simple check to see if we should retry
|
|
# Escalation is currently handled inside run_worker_lifecycle or via manual retry
|
|
pass
|
|
|
|
self._push_state(active_tier="Tier 2 (Tech Lead)")
|
|
time.sleep(1)
|
|
|
|
def _queue_put(event_queue: events.AsyncEventQueue, event_name: str, payload) -> None:
|
|
"""Thread-safe helper to push an event to the AsyncEventQueue from a worker thread."""
|
|
if event_queue is not None:
|
|
event_queue.put(event_name, payload)
|
|
|
|
def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool:
|
|
"""
|
|
Pushes an approval request to the GUI and waits for response.
|
|
"""
|
|
dialog_container = [None]
|
|
task = {
|
|
"action": "mma_step_approval",
|
|
"ticket_id": ticket_id,
|
|
"payload": payload,
|
|
"dialog_container": dialog_container
|
|
}
|
|
_queue_put(event_queue, "mma_step_approval", task)
|
|
|
|
# Wait for the GUI to create the dialog and for the user to respond
|
|
start = time.time()
|
|
while dialog_container[0] is None and time.time() - start < 60:
|
|
time.sleep(0.1)
|
|
if dialog_container[0]:
|
|
approved, final_payload = dialog_container[0].wait()
|
|
return approved
|
|
return False
|
|
|
|
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
|
|
"""
|
|
Pushes a spawn approval request to the GUI and waits for response.
|
|
Returns (approved, modified_prompt, modified_context)
|
|
"""
|
|
dialog_container = [None]
|
|
task = {
|
|
"action": "mma_spawn_approval",
|
|
"ticket_id": ticket_id,
|
|
"role": role,
|
|
"prompt": prompt,
|
|
"context_md": context_md,
|
|
"dialog_container": dialog_container
|
|
}
|
|
_queue_put(event_queue, "mma_spawn_approval", task)
|
|
|
|
# Wait for the GUI to create the dialog and for the user to respond
|
|
start = time.time()
|
|
while dialog_container[0] is None and time.time() - start < 60:
|
|
time.sleep(0.1)
|
|
if dialog_container[0]:
|
|
res = dialog_container[0].wait()
|
|
if isinstance(res, dict):
|
|
approved = res.get("approved", False)
|
|
abort = res.get("abort", False)
|
|
modified_prompt = res.get("prompt", prompt)
|
|
modified_context = res.get("context_md", context_md)
|
|
return approved and not abort, modified_prompt, modified_context
|
|
else:
|
|
# Fallback for old tuple style if any
|
|
approved, final_payload = res
|
|
modified_prompt = prompt
|
|
modified_context = context_md
|
|
if isinstance(final_payload, dict):
|
|
modified_prompt = final_payload.get("prompt", prompt)
|
|
modified_context = final_payload.get("context_md", context_md)
|
|
return approved, modified_prompt, modified_context
|
|
return False, prompt, context_md
|
|
|
|
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "") -> None:
|
|
"""
|
|
Simulates the lifecycle of a single agent working on a ticket.
|
|
Calls the AI client and updates the ticket status based on the response.
|
|
Args:
|
|
ticket: The ticket to process.
|
|
context: The worker context.
|
|
context_files: List of files to include in the context.
|
|
event_queue: Queue for pushing state updates and receiving approvals.
|
|
engine: The conductor engine.
|
|
md_content: The markdown context (history + files) for AI workers.
|
|
"""
|
|
# Enforce Context Amnesia: each ticket starts with a clean slate.
|
|
ai_client.reset_session()
|
|
ai_client.set_provider(ai_client.get_provider(), context.model_name)
|
|
|
|
# Apply Persona if specified
|
|
preferred_models = []
|
|
persona_tool_preset = None
|
|
persona = None
|
|
if context.persona_id:
|
|
pm = PersonaManager(Path(paths.get_project_personas_path(Path.cwd())) if paths.get_project_personas_path(Path.cwd()).exists() else None)
|
|
try:
|
|
personas = pm.load_all()
|
|
if context.persona_id in personas:
|
|
persona = personas[context.persona_id]
|
|
if persona.system_prompt:
|
|
ai_client.set_custom_system_prompt(persona.system_prompt)
|
|
if persona.bias_profile:
|
|
ai_client.set_bias_profile(persona.bias_profile)
|
|
if persona.preferred_models:
|
|
preferred_models = persona.preferred_models
|
|
if persona.tool_preset:
|
|
persona_tool_preset = persona.tool_preset
|
|
except Exception as e:
|
|
print(f"[WARN] Failed to load persona {context.persona_id}: {e}")
|
|
|
|
# Apply tool preset: use persona's tool_preset if available, otherwise fall back to context.tool_preset
|
|
effective_tool_preset = persona_tool_preset or context.tool_preset
|
|
ai_client.set_tool_preset(effective_tool_preset)
|
|
|
|
# Check for abort BEFORE any major work
|
|
if engine and hasattr(engine, "_abort_events"):
|
|
abort_event = engine._abort_events.get(ticket.id)
|
|
if abort_event and abort_event.is_set():
|
|
print(f"[MMA] Ticket {ticket.id} aborted early.")
|
|
ticket.status = "killed"
|
|
if event_queue:
|
|
_queue_put(event_queue, "ticket_completed", {"ticket_id": ticket.id, "timestamp": time.time()})
|
|
return "ABORTED"
|
|
|
|
context_injection = ""
|
|
tokens_before = 0
|
|
tokens_after = 0
|
|
|
|
def _count_tokens(text: str) -> int:
|
|
return len(text) // 4 # Rough estimate
|
|
|
|
if context_files:
|
|
parser = ASTParser(language="python")
|
|
strategy = getattr(persona, "aggregation_strategy", "auto") if persona else "auto"
|
|
for i, file_path in enumerate(context_files):
|
|
try:
|
|
Path(file_path)
|
|
# (This is a bit simplified, but helps)
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
tokens_before += _count_tokens(content)
|
|
|
|
if strategy == "summarize":
|
|
view = summarize.summarise_file(Path(file_path), content)
|
|
elif strategy == "full":
|
|
view = content
|
|
else: # auto or skeleton
|
|
if i == 0:
|
|
view = parser.get_curated_view(content, path=file_path)
|
|
elif ticket.target_file and Path(file_path).resolve() == Path(ticket.target_file).resolve() and ticket.target_symbols:
|
|
view = parser.get_targeted_view(content, ticket.target_symbols, path=file_path)
|
|
else:
|
|
view = parser.get_skeleton(content, path=file_path)
|
|
|
|
tokens_after += _count_tokens(view)
|
|
context_injection += f"\nFile: {file_path}\n{view}\n"
|
|
except Exception as e:
|
|
context_injection += f"\nError reading {file_path}: {e}\n"
|
|
|
|
if tokens_before > 0:
|
|
reduction = ((tokens_before - tokens_after) / tokens_before) * 100
|
|
print(f"[MMA] Context pruning for {ticket.id}: {tokens_before} -> {tokens_after} tokens ({reduction:.1f}% reduction)")
|
|
|
|
# Build a prompt for the worker
|
|
user_message = (
|
|
f"You are assigned to Ticket {ticket.id}.\n"
|
|
f"Task Description: {ticket.description}\n"
|
|
)
|
|
if context_injection:
|
|
user_message += f"\nContext Files:\n{context_injection}\n"
|
|
user_message += (
|
|
"Please complete this task. If you are blocked and cannot proceed, "
|
|
"start your response with 'BLOCKED' and explain why."
|
|
)
|
|
# HITL Clutch: call confirm_spawn if event_queue is provided
|
|
if event_queue:
|
|
approved, modified_prompt, modified_context = confirm_spawn(
|
|
role="Tier 3 Worker",
|
|
prompt=user_message,
|
|
context_md=md_content,
|
|
event_queue=event_queue,
|
|
ticket_id=ticket.id
|
|
)
|
|
if not approved:
|
|
ticket.mark_blocked("Spawn rejected by user.")
|
|
if event_queue:
|
|
_queue_put(event_queue, "ticket_completed", {"ticket_id": ticket.id, "timestamp": time.time()})
|
|
return "BLOCKED: Spawn rejected by user."
|
|
user_message = modified_prompt
|
|
md_content = modified_context
|
|
# HITL Clutch: pass the queue and ticket_id to confirm_execution
|
|
|
|
def clutch_callback(payload: str) -> bool:
|
|
if not event_queue:
|
|
return True
|
|
# SECONDARY CHECK: Before executing any tool, check abort
|
|
if engine and hasattr(engine, "_abort_events"):
|
|
abort_event = engine._abort_events.get(ticket.id)
|
|
if abort_event and abort_event.is_set():
|
|
print(f"[MMA] Ticket {ticket.id} aborted during clutch_callback.")
|
|
return False # Reject tool execution
|
|
return confirm_execution(payload, event_queue, ticket.id)
|
|
|
|
def stream_callback(chunk: str) -> None:
|
|
if event_queue:
|
|
_queue_put(event_queue, 'mma_stream', {'stream_id': f'Tier 3 (Worker): {ticket.id}', 'text': chunk})
|
|
|
|
old_comms_cb = ai_client.get_comms_log_callback()
|
|
def worker_comms_callback(entry: dict) -> None:
|
|
entry["mma_ticket_id"] = ticket.id
|
|
if event_queue:
|
|
kind = entry.get("kind")
|
|
payload = entry.get("payload", {})
|
|
chunk = ""
|
|
if kind == "tool_call":
|
|
chunk = f"\n\n[TOOL CALL] {payload.get('name')}\n{json.dumps(payload.get('script') or payload.get('args'))}\n"
|
|
elif kind == "tool_result":
|
|
res = str(payload.get("output", ""))
|
|
if len(res) > 500: res = res[:500] + "... (truncated)"
|
|
chunk = f"\n[TOOL RESULT]\n{res}\n"
|
|
|
|
if chunk:
|
|
_queue_put(event_queue, "response", {"text": chunk, "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "streaming..."})
|
|
if old_comms_cb:
|
|
old_comms_cb(entry)
|
|
|
|
ai_client.set_comms_log_callback(worker_comms_callback)
|
|
ai_client.set_current_tier(f"Tier 3 (Worker): {ticket.id}")
|
|
try:
|
|
comms_baseline = len(ai_client.get_comms_log())
|
|
response = ai_client.send(
|
|
md_content=md_content,
|
|
user_message=user_message,
|
|
base_dir=".",
|
|
pre_tool_callback=clutch_callback if ticket.step_mode else None,
|
|
qa_callback=ai_client.run_tier4_analysis,
|
|
patch_callback=ai_client.run_tier4_patch_callback,
|
|
stream_callback=stream_callback
|
|
)
|
|
finally:
|
|
ai_client.set_comms_log_callback(old_comms_cb)
|
|
ai_client.set_current_tier(None)
|
|
|
|
# THIRD CHECK: After blocking send() returns
|
|
if engine and hasattr(engine, "_abort_events"):
|
|
abort_event = engine._abort_events.get(ticket.id)
|
|
if abort_event and abort_event.is_set():
|
|
print(f"[MMA] Ticket {ticket.id} aborted after AI call.")
|
|
ticket.status = "killed"
|
|
if event_queue:
|
|
_queue_put(event_queue, "ticket_completed", {"ticket_id": ticket.id, "timestamp": time.time()})
|
|
return "ABORTED"
|
|
|
|
if event_queue:
|
|
# Push via "response" event type — _process_event_queue wraps this
|
|
# as {"action": "handle_ai_response", "payload": ...} for the GUI.
|
|
try:
|
|
response_payload = {
|
|
"text": response,
|
|
"stream_id": f"Tier 3 (Worker): {ticket.id}",
|
|
"status": "done"
|
|
}
|
|
print(f"[MMA] Pushing Tier 3 response for {ticket.id}, stream_id={response_payload['stream_id']}")
|
|
_queue_put(event_queue, "response", response_payload)
|
|
except Exception as e:
|
|
print(f"[MMA] ERROR pushing response to UI: {e}\n{traceback.format_exc()}")
|
|
|
|
# Update usage in engine if provided
|
|
if engine:
|
|
_new_comms = ai_client.get_comms_log()[comms_baseline:]
|
|
_resp_entries = [e for e in _new_comms if e.get("direction") == "IN" and e.get("kind") == "response"]
|
|
_in_tokens = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _resp_entries)
|
|
_out_tokens = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _resp_entries)
|
|
engine.update_usage("Tier 3", _in_tokens, _out_tokens)
|
|
engine._dirty = True
|
|
if "BLOCKED" in response.upper():
|
|
ticket.mark_blocked(response)
|
|
if event_queue:
|
|
_queue_put(event_queue, "response", {"text": f"\n\n[STATUS] BLOCKED: {response}", "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "done"})
|
|
else:
|
|
ticket.mark_complete()
|
|
if event_queue:
|
|
_queue_put(event_queue, "response", {"text": "\n\n[STATUS] COMPLETED", "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "done"})
|
|
|
|
if event_queue:
|
|
_queue_put(event_queue, "ticket_completed", {"ticket_id": ticket.id, "timestamp": time.time()})
|
|
return response
|
|
|