Files
manual_slop/src/app_controller.py
Ed_ f8e1a5b405 feat(tier4): Complete GUI integration for patch modal
- Add patch modal state to AppController instead of App
- Add show_patch_modal/hide_patch_modal action handlers
- Fix push_event to work with flat payload format
- Add patch fields to _gettable_fields
- Both GUI integration tests passing
2026-03-07 00:55:35 -05:00

2038 lines
82 KiB
Python

import threading
import time
import sys
import os
from typing import Any, List, Dict, Optional, Callable
from pathlib import Path
import json
import uuid
import tomli_w
import requests
from dataclasses import asdict
from datetime import datetime
from tkinter import filedialog, Tk
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel
from src import events
from src import paths
from src import session_logger
from src import project_manager
from src import performance_monitor
from src import models
from src.file_cache import ASTParser
from src import ai_client
from src import shell_runner
from src import mcp_client
from src import aggregate
from src import orchestrator_pm
from src import conductor_tech_lead
from src import multi_agent_conductor
from src import theme
def hide_tk_root() -> Tk:
root = Tk()
root.withdraw()
root.wm_attributes("-topmost", True)
return root
class GenerateRequest(BaseModel):
prompt: str
auto_add_history: bool = True
temperature: float | None = None
max_tokens: int | None = None
class ConfirmRequest(BaseModel):
approved: bool
script: Optional[str] = None
class ConfirmDialog:
def __init__(self, script: str, base_dir: str) -> None:
self._uid = str(uuid.uuid4())
self._script = str(script) if script is not None else ""
self._base_dir = str(base_dir) if base_dir is not None else ""
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
start_time = time.time()
with self._condition:
while not self._done:
if time.time() - start_time > 120:
return False, self._script
self._condition.wait(timeout=0.1)
return self._approved, self._script
class MMAApprovalDialog:
def __init__(self, ticket_id: str, payload: str) -> None:
self._payload = payload
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
start_time = time.time()
with self._condition:
while not self._done:
if time.time() - start_time > 120:
return False, self._payload
self._condition.wait(timeout=0.1)
return self._approved, self._payload
class MMASpawnApprovalDialog:
def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None:
self._prompt = prompt
self._context_md = context_md
self._condition = threading.Condition()
self._done = False
self._approved = False
self._abort = False
def wait(self) -> dict[str, Any]:
start_time = time.time()
with self._condition:
while not self._done:
if time.time() - start_time > 120:
return {'approved': False, 'abort': True, 'prompt': self._prompt, 'context_md': self._context_md}
self._condition.wait(timeout=0.1)
return {
'approved': self._approved,
'abort': self._abort,
'prompt': self._prompt,
'context_md': self._context_md
}
class AppController:
"""
The headless controller for the Manual Slop application.
Owns the application state and manages background services.
"""
PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek", "minimax"]
def __init__(self):
# Initialize locks first to avoid initialization order issues
self._send_thread_lock: threading.Lock = threading.Lock()
self._disc_entries_lock: threading.Lock = threading.Lock()
self._pending_comms_lock: threading.Lock = threading.Lock()
self._pending_tool_calls_lock: threading.Lock = threading.Lock()
self._pending_history_adds_lock: threading.Lock = threading.Lock()
self._pending_gui_tasks_lock: threading.Lock = threading.Lock()
self._pending_dialog_lock: threading.Lock = threading.Lock()
self._api_event_queue_lock: threading.Lock = threading.Lock()
self.config: Dict[str, Any] = {}
self.project: Dict[str, Any] = {}
self.active_project_path: str = ""
self.project_paths: List[str] = []
self.active_discussion: str = "main"
self.disc_entries: List[Dict[str, Any]] = []
self.disc_roles: List[str] = []
self.files: List[str] = []
self.screenshots: List[str] = []
self.event_queue: events.SyncEventQueue = events.SyncEventQueue()
self._loop_thread: Optional[threading.Thread] = None
self.tracks: List[Dict[str, Any]] = []
self.active_track: Optional[models.Track] = None
self.active_tickets: List[Dict[str, Any]] = []
self.mma_streams: Dict[str, str] = {}
self._pending_patch_text: Optional[str] = None
self._pending_patch_files: List[str] = []
self._show_patch_modal: bool = False
self._patch_error_message: Optional[str] = None
self.mma_status: str = "idle"
self._tool_log: List[Dict[str, Any]] = []
self._comms_log: List[Dict[str, Any]] = []
self.session_usage: Dict[str, Any] = {
"input_tokens": 0,
"output_tokens": 0,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
"last_latency": 0.0
}
self.mma_tier_usage: Dict[str, Dict[str, Any]] = {
"Tier 1": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-3.1-pro-preview"},
"Tier 2": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-3-flash-preview"},
"Tier 3": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite"},
"Tier 4": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite"},
}
self.perf_monitor: performance_monitor.PerformanceMonitor = performance_monitor.PerformanceMonitor()
self._pending_gui_tasks: List[Dict[str, Any]] = []
self._api_event_queue: List[Dict[str, Any]] = []
# Pending dialogs state moved from App
self._pending_dialog: Optional[ConfirmDialog] = None
self._pending_dialog_open: bool = False
self._pending_actions: Dict[str, ConfirmDialog] = {}
self._pending_ask_dialog: bool = False
# AI settings state
self._current_provider: str = "gemini"
self._current_model: str = "gemini-2.5-flash-lite"
self.temperature: float = 0.0
self.max_tokens: int = 8192
self.history_trunc_limit: int = 8000
# UI-related state moved to controller
self.ui_ai_input: str = ""
self.ui_disc_new_name_input: str = ""
self.ui_disc_new_role_input: str = ""
self.ui_epic_input: str = ""
self.ui_new_track_name: str = ""
self.ui_new_track_desc: str = ""
self.ui_new_track_type: str = "feature"
self.ui_conductor_setup_summary: str = ""
self.ui_last_script_text: str = ""
self.ui_last_script_output: str = ""
self.ui_new_ticket_id: str = ""
self.ui_new_ticket_desc: str = ""
self.ui_new_ticket_target: str = ""
self.ui_new_ticket_deps: str = ""
self.ui_output_dir: str = ""
self.ui_files_base_dir: str = ""
self.ui_shots_base_dir: str = ""
self.ui_project_git_dir: str = ""
self.ui_project_main_context: str = ""
self.ui_project_system_prompt: str = ""
self.ui_gemini_cli_path: str = "gemini"
self.ui_word_wrap: bool = True
self.ui_summary_only: bool = False
self.ui_auto_add_history: bool = False
self.ui_global_system_prompt: str = ""
self.ui_agent_tools: Dict[str, bool] = {}
self.available_models: List[str] = []
self.all_available_models: Dict[str, List[str]] = {} # provider -> list of models
self._autofocus_response_tab = False
self.proposed_tracks: List[Dict[str, Any]] = []
self._show_track_proposal_modal: bool = False
self.ai_status: str = 'idle'
self.ai_response: str = ''
self.last_md: str = ''
self.last_md_path: Optional[Path] = None
self.last_file_items: List[Any] = []
self.send_thread: Optional[threading.Thread] = None
self.models_thread: Optional[threading.Thread] = None
self.show_windows: Dict[str, bool] = {}
self.show_script_output: bool = False
self.show_text_viewer: bool = False
self.text_viewer_title: str = ''
self.text_viewer_content: str = ''
self._pending_comms: List[Dict[str, Any]] = []
self._pending_tool_calls: List[Dict[str, Any]] = []
self._pending_history_adds: List[Dict[str, Any]] = []
self.perf_history: Dict[str, List[float]] = {'frame_time': [0.0]*100, 'fps': [0.0]*100, 'cpu': [0.0]*100, 'input_lag': [0.0]*100}
self._perf_last_update: float = 0.0
self._autosave_interval: float = 60.0
self._last_autosave: float = time.time()
# More state moved from App
self._ask_dialog_open: bool = False
self._ask_request_id: Optional[str] = None
self._ask_tool_data: Optional[Dict[str, Any]] = None
self.mma_step_mode: bool = False
self.active_tier: Optional[str] = None
self.ui_focus_agent: Optional[str] = None
self._pending_mma_approval: Optional[Dict[str, Any]] = None
self._mma_approval_open: bool = False
self._mma_approval_edit_mode: bool = False
self._mma_approval_payload: str = ""
self._pending_mma_spawn: Optional[Dict[str, Any]] = None
self._mma_spawn_open: bool = False
self._mma_spawn_edit_mode: bool = False
self._mma_spawn_prompt: str = ''
self._mma_spawn_context: str = ''
self._trigger_blink: bool = False
self._is_blinking: bool = False
self._blink_start_time: float = 0.0
self._trigger_script_blink: bool = False
self._is_script_blinking: bool = False
self._script_blink_start_time: float = 0.0
self._scroll_disc_to_bottom: bool = False
self._scroll_comms_to_bottom: bool = False
self._scroll_tool_calls_to_bottom: bool = False
self._gemini_cache_text: str = ""
self._last_stable_md: str = ''
self._token_stats: Dict[str, Any] = {}
self._token_stats_dirty: bool = False
self.ui_disc_truncate_pairs: int = 2
self.ui_auto_scroll_comms: bool = True
self.ui_auto_scroll_tool_calls: bool = True
self._show_add_ticket_form: bool = False
self._track_discussion_active: bool = False
self._tier_stream_last_len: Dict[str, int] = {}
self.is_viewing_prior_session: bool = False
self.prior_session_entries: List[Dict[str, Any]] = []
self.test_hooks_enabled: bool = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1")
self.ui_manual_approve: bool = False
self._settable_fields: Dict[str, str] = {
'ai_input': 'ui_ai_input',
'project_git_dir': 'ui_project_git_dir',
'auto_add_history': 'ui_auto_add_history',
'disc_new_name_input': 'ui_disc_new_name_input',
'project_main_context': 'ui_project_main_context',
'gcli_path': 'ui_gemini_cli_path',
'output_dir': 'ui_output_dir',
'files_base_dir': 'ui_files_base_dir',
'ai_status': 'ai_status',
'ai_response': 'ai_response',
'active_discussion': 'active_discussion',
'current_provider': 'current_provider',
'current_model': 'current_model',
'token_budget_pct': '_token_budget_pct',
'token_budget_current': '_token_budget_current',
'token_budget_label': '_token_budget_label',
'show_confirm_modal': 'show_confirm_modal',
'mma_epic_input': 'ui_epic_input',
'mma_status': 'mma_status',
'mma_active_tier': 'active_tier',
'ui_new_track_name': 'ui_new_track_name',
'ui_new_track_desc': 'ui_new_track_desc',
'manual_approve': 'ui_manual_approve'
}
self._gettable_fields = dict(self._settable_fields)
self._gettable_fields.update({
'ui_focus_agent': 'ui_focus_agent',
'active_discussion': 'active_discussion',
'_track_discussion_active': '_track_discussion_active',
'proposed_tracks': 'proposed_tracks',
'mma_streams': 'mma_streams',
'active_track': 'active_track',
'active_tickets': 'active_tickets',
'tracks': 'tracks',
'thinking_indicator': 'thinking_indicator',
'operations_live_indicator': 'operations_live_indicator',
'prior_session_indicator': 'prior_session_indicator',
'_show_patch_modal': '_show_patch_modal',
'_pending_patch_text': '_pending_patch_text',
'_pending_patch_files': '_pending_patch_files'
})
self._init_actions()
@property
def thinking_indicator(self) -> bool:
return self.ai_status in ("sending...", "streaming...")
@property
def operations_live_indicator(self) -> bool:
return not self.is_viewing_prior_session
@property
def prior_session_indicator(self) -> bool:
return self.is_viewing_prior_session
def _init_actions(self) -> None:
# Set up state-related action maps
self._clickable_actions: dict[str, Callable[..., Any]] = {
'btn_reset': self._handle_reset_session,
'btn_gen_send': self._handle_generate_send,
'btn_md_only': self._handle_md_only,
'btn_approve_script': self._handle_approve_script,
'btn_reject_script': self._handle_reject_script,
'btn_project_save': self._cb_project_save,
'btn_disc_create': self._cb_disc_create,
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type),
'btn_approve_tool': self._handle_approve_ask,
'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True),
'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True),
'btn_prune_logs': self.cb_prune_logs,
}
self._predefined_callbacks: dict[str, Callable[..., Any]] = {
'_test_callback_func_write_to_file': self._test_callback_func_write_to_file,
'_set_env_var': lambda k, v: os.environ.update({k: v})
}
def _update_gcli_adapter(self, path: str) -> None:
sys.stderr.write(f"[DEBUG] _update_gcli_adapter called with: {path}\n")
sys.stderr.flush()
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=str(path))
else:
ai_client._gemini_cli_adapter.binary_path = str(path)
def _set_status(self, status: str) -> None:
"""Thread-safe update of ai_status via the GUI task queue."""
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "set_ai_status",
"payload": status
})
def _set_mma_status(self, status: str) -> None:
"""Thread-safe update of mma_status via the GUI task queue."""
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "set_mma_status",
"payload": status
})
def _process_pending_gui_tasks(self) -> None:
if not self._pending_gui_tasks:
return
sys.stderr.write(f"[DEBUG] _process_pending_gui_tasks: processing {len(self._pending_gui_tasks)} tasks\n")
sys.stderr.flush()
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in tasks:
try:
action = task.get("action")
sys.stderr.write(f"[DEBUG] Processing GUI task: action={action}\n")
sys.stderr.flush()
if action:
session_logger.log_api_hook("PROCESS_TASK", action, str(task))
# ...
if action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}), md_content=self.last_md or None)
elif action == "set_ai_status":
self.ai_status = task.get("payload", "")
sys.stderr.write(f"[DEBUG] Updated ai_status via task to: {self.ai_status}\n")
sys.stderr.flush()
elif action == "set_mma_status":
self.mma_status = task.get("payload", "")
elif action == "handle_ai_response":
payload = task.get("payload", {})
text = payload.get("text", "")
stream_id = payload.get("stream_id")
is_streaming = payload.get("status") == "streaming..."
if stream_id:
if is_streaming:
if stream_id not in self.mma_streams: self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
else:
self.mma_streams[stream_id] = text
if stream_id == "Tier 1":
if "status" in payload:
self.ai_status = payload["status"]
else:
if is_streaming:
self.ai_response += text
else:
self.ai_response = text
self.ai_status = payload.get("status", "done")
sys.stderr.write(f"[DEBUG] Updated ai_status to: {self.ai_status}\n")
sys.stderr.flush()
self._trigger_blink = True
if not stream_id:
self._token_stats_dirty = True
if not is_streaming:
self._autofocus_response_tab = True
# ONLY add to history when turn is complete
if self.ui_auto_add_history and not stream_id and not is_streaming:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": self.ai_response,
"collapsed": True,
"ts": project_manager.now_ts()
})
elif action in ("mma_stream", "mma_stream_append"):
# Some events might have these at top level, some in a 'payload' dict
stream_id = task.get("stream_id") or task.get("payload", {}).get("stream_id")
text = task.get("text") or task.get("payload", {}).get("text", "")
if stream_id:
if stream_id not in self.mma_streams:
self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
elif action == "show_track_proposal":
self.proposed_tracks = task.get("payload", [])
self._show_track_proposal_modal = True
elif action == "mma_state_update":
# Handle both internal (nested) and hook-server (flattened) payloads
p = task.get("payload")
if not isinstance(p, dict):
p = task # Fallback to task itself if payload is missing or wrong type
sys.stderr.write(f"[DEBUG] mma_state_update: status={p.get('status')} active_tier={p.get('active_tier')}\n")
sys.stderr.flush()
self.mma_status = p.get("status", self.mma_status)
self.active_tier = p.get("active_tier", self.active_tier)
# Preserve existing model/provider config if not explicitly in payload
new_usage = p.get("tier_usage", {})
for tier, data in new_usage.items():
if tier in self.mma_tier_usage:
# Update usage counts but keep selected model/provider if not in update
self.mma_tier_usage[tier]["input"] = data.get("input", self.mma_tier_usage[tier]["input"])
self.mma_tier_usage[tier]["output"] = data.get("output", self.mma_tier_usage[tier]["output"])
if "model" in data: self.mma_tier_usage[tier]["model"] = data["model"]
if "provider" in data: self.mma_tier_usage[tier]["provider"] = data["provider"]
else:
self.mma_tier_usage[tier] = data
self.active_tickets = p.get("tickets", [])
track_data = p.get("track")
if track_data:
tickets = []
for t_data in self.active_tickets:
if isinstance(t_data, models.Ticket):
tickets.append(t_data)
else:
# Map 'goal' from Godot format to 'description' if needed
if "goal" in t_data and "description" not in t_data:
t_data["description"] = t_data["goal"]
tickets.append(models.Ticket.from_dict(t_data))
self.active_track = models.Track(
id=track_data.get("id"),
description=track_data.get("title", ""),
tickets=tickets
)
elif action == "set_value":
item = task.get("item")
value = task.get("value")
sys.stderr.write(f"[DEBUG] Processing set_value: {item}={value}\n")
sys.stderr.flush()
if item in self._settable_fields:
attr_name = self._settable_fields[item]
setattr(self, attr_name, value)
sys.stderr.write(f"[DEBUG] Set {attr_name} to {value}\n")
sys.stderr.flush()
if item == "gcli_path":
self._update_gcli_adapter(str(value))
elif action == "click":
item = task.get("item")
user_data = task.get("user_data")
sys.stderr.write(f"[DEBUG] Processing click: {item} (user_data={user_data})\n")
sys.stderr.flush()
if item == "btn_project_new_automated":
self._cb_new_project_automated(user_data)
elif item == "btn_mma_load_track":
self._cb_load_track(str(user_data or ""))
elif item in self._clickable_actions:
import inspect
func = self._clickable_actions[item]
try:
sig = inspect.signature(func)
if 'user_data' in sig.parameters:
func(user_data=user_data)
else:
func()
except Exception:
func()
elif action == "select_list_item":
item = task.get("listbox", task.get("item"))
value = task.get("item_value", task.get("value"))
if item == "disc_listbox":
self._switch_discussion(str(value or ""))
elif task.get("type") == "ask":
self._pending_ask_dialog = True
self._ask_request_id = task.get("request_id")
self._ask_tool_data = task.get("data", {})
elif action == "clear_ask":
if self._ask_request_id == task.get("request_id"):
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
elif action == "custom_callback":
cb = task.get("callback")
args = task.get("args", [])
if callable(cb):
try: cb(*args)
except Exception as e: print(f"Error in direct custom callback: {e}")
elif cb in self._predefined_callbacks:
self._predefined_callbacks[cb](*args)
elif action == "mma_step_approval":
dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or ""))
self._pending_mma_approval = task
if "dialog_container" in task:
task["dialog_container"][0] = dlg
elif action == 'refresh_from_project':
self._refresh_from_project()
elif action == "show_patch_modal":
self._pending_patch_text = task.get("patch_text", "")
self._pending_patch_files = task.get("file_paths", [])
self._show_patch_modal = True
elif action == "hide_patch_modal":
self._show_patch_modal = False
self._pending_patch_text = None
self._pending_patch_files = []
elif action == "mma_spawn_approval":
spawn_dlg = MMASpawnApprovalDialog(
str(task.get("ticket_id") or ""),
str(task.get("role") or ""),
str(task.get("prompt") or ""),
str(task.get("context_md") or "")
)
self._pending_mma_spawn = task
self._mma_spawn_prompt = task.get("prompt", "")
self._mma_spawn_context = task.get("context_md", "")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
if "dialog_container" in task:
task["dialog_container"][0] = spawn_dlg
except Exception as e:
import traceback
sys.stderr.write(f"[DEBUG] Error executing GUI task: {e}\n{traceback.format_exc()}\n")
sys.stderr.flush()
print(f"Error executing GUI task: {e}")
def _process_pending_history_adds(self) -> None:
"""Synchronizes pending history entries to the active discussion and project state."""
with self._pending_history_adds_lock:
items = self._pending_history_adds[:]
self._pending_history_adds.clear()
if not items:
return
self._scroll_disc_to_bottom = True
for item in items:
item.get("role", "unknown")
if item.get("role") and item["role"] not in self.disc_roles:
self.disc_roles.append(item["role"])
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
disc_data = discussions.get(self.active_discussion)
if disc_data is not None:
if item.get("disc_title", self.active_discussion) == self.active_discussion:
if self.disc_entries is not disc_data.get("history"):
if "history" not in disc_data:
disc_data["history"] = []
disc_data["history"].append(project_manager.entry_to_str(item))
disc_data["last_updated"] = project_manager.now_ts()
with self._disc_entries_lock:
self.disc_entries.append(item)
def _test_callback_func_write_to_file(self, data: str) -> None:
"""A dummy function that a custom_callback would execute for testing."""
callback_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "tests", "artifacts", "temp_callback_output.txt")
os.makedirs(os.path.dirname(callback_path), exist_ok=True)
with open(callback_path, "w") as f:
f.write(data)
def _handle_approve_script(self, user_data=None) -> None:
"""Approves the currently pending PowerShell script."""
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
with dlg._condition:
dlg._approved = True
dlg._done = True
dlg._condition.notify_all()
self._pending_dialog = None
def _handle_reject_script(self, user_data=None) -> None:
"""Rejects the currently pending PowerShell script."""
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
with dlg._condition:
dlg._approved = False
dlg._done = True
dlg._condition.notify_all()
self._pending_dialog = None
def init_state(self):
"""Initializes the application state from configurations."""
self.config = models.load_config()
ai_cfg = self.config.get("ai", {})
self._current_provider = ai_cfg.get("provider", "gemini")
self._current_model = ai_cfg.get("model", "gemini-2.5-flash-lite")
self.temperature = ai_cfg.get("temperature", 0.0)
self.max_tokens = ai_cfg.get("max_tokens", 8192)
self.history_trunc_limit = ai_cfg.get("history_trunc_limit", 8000)
projects_cfg = self.config.get("projects", {})
self.project_paths = list(projects_cfg.get("paths", []))
self.active_project_path = projects_cfg.get("active", "")
self._load_active_project()
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles = list(disc_sec.get("roles", ["User", "AI", "Vendor API", "System"]))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
with self._disc_entries_lock:
self.disc_entries = models.parse_history_entries(disc_data.get("history", []), self.disc_roles)
# UI state
self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".")
proj_meta = self.project.get("project", {})
self.ui_project_git_dir = proj_meta.get("git_dir", "")
self.ui_project_main_context = proj_meta.get("main_context", "")
self.ui_project_system_prompt = proj_meta.get("system_prompt", "")
self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini")
self._update_gcli_adapter(self.ui_gemini_cli_path)
self.ui_word_wrap = proj_meta.get("word_wrap", True)
self.ui_summary_only = proj_meta.get("summary_only", False)
self.ui_auto_add_history = disc_sec.get("auto_add", False)
self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "")
_default_windows = {
"Context Hub": True,
"Files & Media": True,
"AI Settings": True,
"MMA Dashboard": True,
"Tier 1: Strategy": True,
"Tier 2: Tech Lead": True,
"Tier 3: Workers": True,
"Tier 4: QA": True,
"Discussion Hub": True,
"Operations Hub": True,
"Message": False,
"Response": False,
"Tool Calls": False,
"Theme": True,
"Log Management": False,
"Diagnostics": False,
}
saved = self.config.get("gui", {}).get("show_windows", {})
self.show_windows = {k: saved.get(k, v) for k, v in _default_windows.items()}
agent_tools_cfg = self.project.get("agent", {}).get("tools", {})
self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in models.AGENT_TOOL_NAMES}
label = self.project.get("project", {}).get("name", "")
session_logger.open_session(label=label)
def cb_load_prior_log(self) -> None:
root = hide_tk_root()
path = filedialog.askopenfilename(
title="Load Session Log",
initialdir=str(paths.get_logs_dir()),
filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")]
)
root.destroy()
if not path:
return
entries = []
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
except Exception as e:
self._set_status(f"log load error: {e}")
return
self.prior_session_entries = entries
self.is_viewing_prior_session = True
self._set_status(f"viewing prior session: {Path(path).name} ({len(entries)} entries)")
def cb_prune_logs(self) -> None:
"""Manually triggers the log pruning process with aggressive thresholds."""
self._set_status("Manual prune started (Age > 0d, Size < 100KB)...")
def run_manual_prune() -> None:
try:
from src import log_registry
from src import log_pruner
registry = log_registry.LogRegistry(str(paths.get_logs_dir() / "log_registry.toml"))
pruner = log_pruner.LogPruner(registry, str(paths.get_logs_dir()))
# Aggressive: Prune anything not whitelisted, even if just created, if under 100KB
# Note: max_age_days=0 means cutoff is NOW.
pruner.prune(max_age_days=0, min_size_kb=100)
self._set_status("Manual prune complete.")
except Exception as e:
self._set_status(f"Manual prune error: {e}")
print(f"Error during manual log pruning: {e}")
thread = threading.Thread(target=run_manual_prune, daemon=True)
thread.start()
def _load_active_project(self) -> None:
"""Loads the active project configuration, with fallbacks."""
if self.active_project_path and Path(self.active_project_path).exists():
try:
self.project = project_manager.load_project(self.active_project_path)
return
except Exception as e:
print(f"Failed to load project {self.active_project_path}: {e}")
for pp in self.project_paths:
if Path(pp).exists():
try:
self.project = project_manager.load_project(pp)
self.active_project_path = pp
return
except Exception:
continue
self.project = project_manager.migrate_from_legacy_config(self.config)
name = self.project.get("project", {}).get("name", "project")
fallback_path = f"{name}.toml"
project_manager.save_project(self.project, fallback_path)
self.active_project_path = fallback_path
if fallback_path not in self.project_paths:
self.project_paths.append(fallback_path)
def _prune_old_logs(self) -> None:
"""Asynchronously prunes old insignificant logs on startup."""
def run_prune() -> None:
try:
from src import log_registry
from src import log_pruner
registry = log_registry.LogRegistry(str(paths.get_logs_dir() / "log_registry.toml"))
pruner = log_pruner.LogPruner(registry, str(paths.get_logs_dir()))
pruner.prune()
except Exception as e:
print(f"Error during log pruning: {e}")
thread = threading.Thread(target=run_prune, daemon=True)
thread.start()
def _fetch_models(self, provider: str) -> None:
self._set_status("fetching models...")
def do_fetch() -> None:
try:
for p in self.PROVIDERS:
try:
self.all_available_models[p] = ai_client.list_models(p)
except Exception as e:
sys.stderr.write(f"[DEBUG] Error fetching models for {p}: {e}\n")
self.all_available_models[p] = []
models_list = self.all_available_models.get(provider, [])
self.available_models = models_list
if self.current_model not in models_list and models_list:
self.current_model = models_list[0]
ai_client.set_provider(self._current_provider, self.current_model)
self._set_status(f"models loaded: {len(models_list)}")
except Exception as e:
self._set_status(f"model fetch error: {e}")
self.models_thread = threading.Thread(target=do_fetch, daemon=True)
self.models_thread.start()
def start_services(self, app: Any = None):
"""Starts background threads."""
sys.stderr.write("[DEBUG] AppController.start_services called\n")
sys.stderr.flush()
self._prune_old_logs()
self._init_ai_and_hooks(app)
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
sys.stderr.write(f"[DEBUG] _loop_thread started: {self._loop_thread.ident}\n")
sys.stderr.flush()
def shutdown(self) -> None:
"""Stops background threads and cleans up resources."""
from src import ai_client
ai_client.cleanup()
if hasattr(self, 'hook_server') and self.hook_server:
self.hook_server.stop()
self.event_queue.put("shutdown", None)
if self._loop_thread and self._loop_thread.is_alive():
self._loop_thread.join(timeout=2.0)
def _init_ai_and_hooks(self, app: Any = None) -> None:
from src import api_hooks
ai_client.set_provider(self._current_provider, self._current_model)
if self._current_provider == "gemini_cli":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
ai_client.confirm_and_run_callback = self._confirm_and_run
ai_client.comms_log_callback = self._on_comms_entry
ai_client.tool_log_callback = self._on_tool_log
mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics
self.perf_monitor.alert_callback = self._on_performance_alert
ai_client.events.on("request_start", lambda **kw: self._on_api_event("request_start", **kw))
ai_client.events.on("response_received", lambda **kw: self._on_api_event("response_received", **kw))
ai_client.events.on("tool_execution", lambda **kw: self._on_api_event("tool_execution", **kw))
self.hook_server = api_hooks.HookServer(app if app else self)
self.hook_server.start()
def _run_event_loop(self):
"""Internal loop runner."""
def queue_fallback() -> None:
while True:
try:
if hasattr(self, '_process_pending_gui_tasks'):
self._process_pending_gui_tasks()
if hasattr(self, '_process_pending_history_adds'):
self._process_pending_history_adds()
except: pass
time.sleep(0.1)
fallback_thread = threading.Thread(target=queue_fallback, daemon=True)
fallback_thread.start()
self._process_event_queue()
def _process_event_queue(self) -> None:
"""Listens for and processes events from the SyncEventQueue."""
sys.stderr.write("[DEBUG] _process_event_queue entered\n")
sys.stderr.flush()
while True:
event_name, payload = self.event_queue.get()
sys.stderr.write(f"[DEBUG] _process_event_queue got event: {event_name} with payload: {str(payload)[:100]}\n")
sys.stderr.flush()
if event_name == "shutdown":
break
if event_name == "user_request":
threading.Thread(target=self._handle_request_event, args=(payload,), daemon=True).start()
elif event_name == "gui_task":
with self._pending_gui_tasks_lock:
# Directly append the task from the hook server.
# It already contains 'action' and any necessary fields.
self._pending_gui_tasks.append(payload)
elif event_name == "mma_state_update":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "mma_state_update",
"payload": payload
})
elif event_name == "mma_stream":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "mma_stream_append",
"payload": payload
})
elif event_name in ("mma_spawn_approval", "mma_step_approval"):
with self._pending_gui_tasks_lock:
# These payloads already contain the 'action' field
self._pending_gui_tasks.append(payload)
elif event_name == "response":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": payload
})
if self.test_hooks_enabled:
with self._api_event_queue_lock:
self._api_event_queue.append({"type": "response", "payload": payload})
def _handle_request_event(self, event: events.UserRequestEvent) -> None:
"""Processes a UserRequestEvent by calling the AI client."""
ai_client.set_current_tier(None) # Ensure main discussion is untagged
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": event.prompt,
"collapsed": True,
"ts": project_manager.now_ts()
})
# Clear response area for new turn
self.ai_response = ""
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools)
# Force update adapter path right before send to bypass potential duplication issues
self._update_gcli_adapter(self.ui_gemini_cli_path)
sys.stderr.write(f"[DEBUG] Calling ai_client.send with provider={ai_client.get_provider()}, model={self.current_model}, gcli_path={self.ui_gemini_cli_path}\n")
sys.stderr.flush()
try:
resp = ai_client.send(
event.stable_md,
event.prompt,
event.base_dir,
event.file_items,
event.disc_text,
stream=True,
stream_callback=lambda text: self._on_ai_stream(text),
pre_tool_callback=self._confirm_and_run,
qa_callback=ai_client.run_tier4_analysis,
patch_callback=ai_client.run_tier4_patch_callback
)
self.event_queue.put("response", {"text": resp, "status": "done", "role": "AI"})
except ai_client.ProviderError as e:
sys.stderr.write(f"[DEBUG] _handle_request_event ai_client.ProviderError: {e.ui_message()}\n")
sys.stderr.flush()
self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"})
except Exception as e:
import traceback
sys.stderr.write(f"[DEBUG] _handle_request_event ERROR: {e}\n{traceback.format_exc()}\n")
sys.stderr.flush()
self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"})
def _on_ai_stream(self, text: str) -> None:
"""Handles streaming text from the AI."""
self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"})
def _on_comms_entry(self, entry: Dict[str, Any]) -> None:
session_logger.log_comms(entry)
entry["local_ts"] = time.time()
kind = entry.get("kind")
payload = entry.get("payload", {})
if kind == "response" and "usage" in payload:
u = payload["usage"]
for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]:
if k in u:
self.session_usage[k] += u.get(k, 0) or 0
if kind in ("tool_result", "tool_call"):
role = "Tool" if kind == "tool_result" else "Vendor API"
content = ""
if kind == "tool_result":
content = payload.get("output", "")
else:
content = payload.get("script") or payload.get("args") or payload.get("message", "")
if isinstance(content, dict):
content = json.dumps(content, indent=1)
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": f"[{kind.upper().replace('_', ' ')}]\n{content}",
"collapsed": True,
"ts": entry.get("ts", project_manager.now_ts())
})
if kind == "history_add":
payload = entry.get("payload", {})
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": payload.get("role", "AI"),
"content": payload.get("content", ""),
"collapsed": payload.get("collapsed", False),
"ts": entry.get("ts", project_manager.now_ts())
})
return
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _on_tool_log(self, script: str, result: str) -> None:
session_logger.log_tool_call(script, result, None)
source_tier = ai_client.get_current_tier()
with self._pending_tool_calls_lock:
self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
def _on_api_event(self, event_name: str = "generic_event", **kwargs: Any) -> None:
payload = kwargs.get("payload", {})
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
if self.test_hooks_enabled:
with self._api_event_queue_lock:
self._api_event_queue.append({"type": event_name, "payload": payload})
def _on_performance_alert(self, message: str) -> None:
alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load."
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "System",
"content": alert_text,
"ts": project_manager.now_ts()
})
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]:
sys.stderr.write(f"[DEBUG] _confirm_and_run called. test_hooks={self.test_hooks_enabled}, manual_approve={getattr(self, 'ui_manual_approve', False)}\n")
sys.stderr.flush()
if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False):
sys.stderr.write("[DEBUG] Auto-approving script.\n")
sys.stderr.flush()
self._set_status("running powershell...")
output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self._append_tool_log(script, output)
self._set_status("powershell done, awaiting AI...")
return output
sys.stderr.write("[DEBUG] Creating ConfirmDialog.\n")
sys.stderr.flush()
dialog = ConfirmDialog(script, base_dir)
is_headless = "--headless" in sys.argv
if is_headless:
with self._pending_dialog_lock:
self._pending_actions[dialog._uid] = dialog
else:
with self._pending_dialog_lock:
self._pending_dialog = dialog
if self.test_hooks_enabled and hasattr(self, '_api_event_queue'):
with self._api_event_queue_lock:
self._api_event_queue.append({
"type": "script_confirmation_required",
"action_id": dialog._uid,
"script": str(script),
"base_dir": str(base_dir),
"ts": time.time()
})
sys.stderr.write(f"[DEBUG] Appended script_confirmation_required to _api_event_queue. ID={dialog._uid}\n")
sys.stderr.flush()
sys.stderr.write(f"[DEBUG] Waiting for dialog ID={dialog._uid}...\n")
sys.stderr.flush()
approved, final_script = dialog.wait()
sys.stderr.write(f"[DEBUG] Dialog ID={dialog._uid} finished wait. approved={approved}\n")
sys.stderr.flush()
if is_headless:
with self._pending_dialog_lock:
if dialog._uid in self._pending_actions:
del self._pending_actions[dialog._uid]
if not approved:
self._append_tool_log(final_script, "REJECTED by user")
return None
self._set_status("running powershell...")
output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self._append_tool_log(final_script, output)
self._set_status("powershell done, awaiting AI...")
return output
def _append_tool_log(self, script: str, result: str, source_tier: str | None = None) -> None:
self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
self.ui_last_script_text = script
self.ui_last_script_output = result
self._trigger_script_blink = True
self.show_script_output = True
if self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
def resolve_pending_action(self, action_id: str, approved: bool) -> bool:
with self._pending_dialog_lock:
if action_id in self._pending_actions:
dialog = self._pending_actions[action_id]
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
elif self._pending_dialog and self._pending_dialog._uid == action_id:
dialog = self._pending_dialog
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
return False
@property
def current_provider(self) -> str:
return self._current_provider
@current_provider.setter
def current_provider(self, value: str) -> None:
if value != self._current_provider:
self._current_provider = value
ai_client.reset_session()
ai_client.set_provider(value, self.current_model)
self._token_stats = {}
self._token_stats_dirty = True
@property
def current_model(self) -> str:
return self._current_model
@current_model.setter
def current_model(self, value: str) -> None:
if value != self._current_model:
self._current_model = value
ai_client.reset_session()
ai_client.set_provider(self.current_provider, value)
self._token_stats = {}
self._token_stats_dirty = True
def create_api(self) -> FastAPI:
"""Creates and configures the FastAPI application for headless mode."""
api = FastAPI(title="Manual Slop Headless API")
API_KEY_NAME = "X-API-KEY"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def get_api_key(header_key: str = Depends(api_key_header)) -> str:
"""Validates the API key from the request header against configuration."""
headless_cfg = self.config.get("headless", {})
config_key = headless_cfg.get("api_key", "").strip()
env_key = os.environ.get("SLOP_API_KEY", "").strip()
target_key = env_key or config_key
if not target_key:
raise HTTPException(status_code=403, detail="API Key not configured on server")
if header_key == target_key:
return header_key
raise HTTPException(status_code=403, detail="Could not validate API Key")
@api.get("/health")
def health() -> dict[str, str]:
"""Returns the health status of the API."""
return {"status": "ok"}
@api.get("/api/gui/state", dependencies=[Depends(get_api_key)])
def get_gui_state() -> dict[str, Any]:
"""Returns the current GUI state for specific fields."""
gettable = getattr(self, "_gettable_fields", {})
state = {}
import dataclasses
for key, attr in gettable.items():
val = getattr(self, attr, None)
if dataclasses.is_dataclass(val):
state[key] = dataclasses.asdict(val)
else:
state[key] = val
return state
@api.get("/api/gui/mma_status", dependencies=[Depends(get_api_key)])
def get_mma_status() -> dict[str, Any]:
"""Dedicated endpoint for MMA-related status."""
return {
"mma_status": self.mma_status,
"ai_status": self.ai_status,
"mma_streams": self.mma_streams,
"active_tier": self.active_tier,
"active_tickets": self.active_tickets,
"proposed_tracks": self.proposed_tracks
}
@api.post("/api/gui", dependencies=[Depends(get_api_key)])
def post_gui(req: dict) -> dict[str, str]:
"""Pushes a GUI task to the event queue."""
self.event_queue.put("gui_task", req)
return {"status": "queued"}
@api.get("/api/session", dependencies=[Depends(get_api_key)])
def get_api_session() -> dict[str, Any]:
"""Returns current discussion session entries."""
with self._disc_entries_lock:
return {"session": {"entries": self.disc_entries}}
@api.post("/api/session", dependencies=[Depends(get_api_key)])
def post_api_session(req: dict) -> dict[str, str]:
"""Updates session entries."""
entries = req.get("entries", [])
with self._disc_entries_lock:
self.disc_entries = entries
return {"status": "updated"}
@api.get("/api/project", dependencies=[Depends(get_api_key)])
def get_api_project() -> dict[str, Any]:
"""Returns current project data."""
return {"project": self.project}
@api.get("/api/performance", dependencies=[Depends(get_api_key)])
def get_performance() -> dict[str, Any]:
"""Returns performance monitor metrics."""
return {"performance": self.perf_monitor.get_metrics()}
@api.get("/api/gui/diagnostics", dependencies=[Depends(get_api_key)])
def get_diagnostics() -> dict[str, Any]:
"""Alias for performance metrics."""
return self.perf_monitor.get_metrics()
@api.get("/status", dependencies=[Depends(get_api_key)])
def status() -> dict[str, Any]:
"""Returns the current status of the application."""
return {
"provider": self.current_provider,
"model": self.current_model,
"status": self.ai_status,
"usage": self.session_usage
}
@api.post("/api/v1/generate", dependencies=[Depends(get_api_key)])
def generate(req: GenerateRequest) -> dict[str, Any]:
"""Triggers an AI generation request using the current project context."""
if not req.prompt.strip():
raise HTTPException(status_code=400, detail="Prompt cannot be empty")
with self._send_thread_lock:
start_time = time.time()
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self._last_stable_md = stable_md
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}")
user_msg = req.prompt
base_dir = self.ui_files_base_dir
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
temp = req.temperature if req.temperature is not None else self.temperature
tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens
ai_client.set_model_params(temp, tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools)
if req.auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": user_msg,
"collapsed": True,
"ts": project_manager.now_ts()
})
try:
resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text)
if req.auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "AI",
"content": resp,
"collapsed": True,
"ts": project_manager.now_ts()
})
self._recalculate_session_usage()
duration = time.time() - start_time
return {
"text": resp,
"metadata": {
"provider": self.current_provider,
"model": self.current_model,
"duration_sec": round(duration, 3),
"timestamp": project_manager.now_ts()
},
"usage": self.session_usage
}
except ai_client.ProviderError as e:
raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}")
@api.post("/api/v1/stream", dependencies=[Depends(get_api_key)])
async def stream(req: GenerateRequest) -> Any:
"""Placeholder for streaming AI generation responses (Not yet implemented)."""
raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.")
@api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)])
def pending_actions() -> list[dict[str, Any]]:
"""Lists all pending PowerShell scripts awaiting confirmation."""
with self._pending_dialog_lock:
return [
{"action_id": uid, "script": diag._script, "base_dir": diag._base_dir}
for uid, diag in self._pending_actions.items()
]
@api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)])
def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, str]:
"""Approves or rejects a pending action."""
with self._pending_dialog_lock:
if action_id not in self._pending_actions:
raise HTTPException(status_code=404, detail="Action not found")
dialog = self._pending_actions.pop(action_id)
if req.script is not None:
dialog._script = req.script
with dialog._condition:
dialog._approved = req.approved
dialog._done = True
dialog._condition.notify_all()
return {"status": "confirmed" if req.approved else "rejected"}
@api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)])
def list_sessions() -> list[str]:
"""Lists all session IDs."""
log_dir = paths.get_logs_dir()
if not log_dir.exists():
return []
return [d.name for d in log_dir.iterdir() if d.is_dir()]
@api.get("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)])
def get_session(session_id: str) -> dict[str, Any]:
"""Returns the content of the comms.log for a specific session."""
log_path = paths.get_logs_dir() / session_id / "comms.log"
if not log_path.exists():
raise HTTPException(status_code=404, detail="Session log not found")
return {"id": session_id, "content": log_path.read_text(encoding="utf-8", errors="replace")}
@api.delete("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)])
def delete_session(session_id: str) -> dict[str, str]:
"""Deletes a specific session directory."""
log_path = paths.get_logs_dir() / session_id
if not log_path.exists() or not log_path.is_dir():
raise HTTPException(status_code=404, detail="Session directory not found")
import shutil
shutil.rmtree(log_path)
return {"status": "deleted"}
@api.get("/api/v1/context", dependencies=[Depends(get_api_key)])
def get_context() -> dict[str, Any]:
"""Returns the current aggregated project context."""
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
# Pull current screenshots if available in project
screenshots = self.project.get("screenshots", {}).get("paths", [])
return {
"files": [f.get("path") if isinstance(f, dict) else str(f) for f in file_items],
"screenshots": screenshots,
"files_base_dir": self.ui_files_base_dir,
"markdown": md,
"discussion": disc_text
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}")
@api.get("/api/v1/token_stats", dependencies=[Depends(get_api_key)])
def token_stats() -> dict[str, Any]:
"""Returns current token usage and budget statistics."""
return self._token_stats
return api
def _cb_new_project_automated(self, user_data: Any) -> None:
if user_data:
name = Path(user_data).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, user_data)
if user_data not in self.project_paths:
self.project_paths.append(user_data)
self._switch_project(user_data)
def _cb_project_save(self) -> None:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
models.save_config(self.config)
self._set_status("config saved")
def _cb_disc_create(self) -> None:
nm = self.ui_disc_new_name_input.strip()
if nm:
self._create_discussion(nm)
self.ui_disc_new_name_input = ""
def _switch_project(self, path: str) -> None:
if not Path(path).exists():
self._set_status(f"project file not found: {path}")
return
self._flush_to_project()
self._save_active_project()
try:
self.project = project_manager.load_project(path)
self.active_project_path = path
except Exception as e:
self._set_status(f"failed to load project: {e}")
return
self._refresh_from_project()
self._set_status(f"switched to: {Path(path).stem}")
def _refresh_from_project(self) -> None:
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles = list(disc_sec.get("roles", ["User", "AI", "Vendor API", "System"]))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
with self._disc_entries_lock:
self.disc_entries = models.parse_history_entries(disc_data.get("history", []), self.disc_roles)
proj = self.project
self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".")
proj_meta = self.project.get("project", {})
self.ui_project_git_dir = proj_meta.get("git_dir", "")
self.ui_project_system_prompt = proj_meta.get("system_prompt", "")
self.ui_project_main_context = proj_meta.get("main_context", "")
self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini")
self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False)
self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True)
self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True)
self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True)
self.ui_summary_only = proj.get("project", {}).get("summary_only", False)
agent_tools_cfg = proj.get("agent", {}).get("tools", {})
self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in models.AGENT_TOOL_NAMES}
# MMA Tracks
self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir)
# Restore MMA state
mma_sec = proj.get("mma", {})
self.ui_epic_input = mma_sec.get("epic", "")
at_data = mma_sec.get("active_track")
if at_data:
try:
tickets = []
for t_data in at_data.get("tickets", []):
tickets.append(models.Ticket(**t_data))
self.active_track = models.Track(
id=at_data.get("id"),
description=at_data.get("description"),
tickets=tickets
)
self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table
except Exception as e:
print(f"Failed to deserialize active track: {e}")
self.active_track = None
else:
self.active_track = None
self.active_tickets = []
# Load track-scoped history if track is active
if self.active_track:
track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir)
if track_history:
with self._disc_entries_lock:
self.disc_entries = models.parse_history_entries(track_history, self.disc_roles)
def _cb_load_track(self, track_id: str) -> None:
state = project_manager.load_track_state(track_id, self.ui_files_base_dir)
if state:
try:
# Convert list[Ticket] or list[dict] to list[Ticket] for Track object
tickets = []
for t in state.tasks:
if isinstance(t, dict):
tickets.append(models.Ticket(**t))
else:
tickets.append(t)
self.active_track = models.Track(
id=state.metadata.id,
description=state.metadata.name,
tickets=tickets
)
# Keep dicts for UI table (or convert models.Ticket objects back to dicts if needed)
self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets]
# Load track-scoped history
history = project_manager.load_track_history(track_id, self.ui_files_base_dir)
with self._disc_entries_lock:
if history:
self.disc_entries = models.parse_history_entries(history, self.disc_roles)
else:
self.disc_entries = []
self._recalculate_session_usage()
self._set_status(f"Loaded track: {state.metadata.name}")
except Exception as e:
self._set_status(f"Load track error: {e}")
print(f"Error loading track {track_id}: {e}")
def _save_active_project(self) -> None:
if self.active_project_path:
try:
project_manager.save_project(self.project, self.active_project_path)
except Exception as e:
self._set_status(f"save error: {e}")
def _get_discussion_names(self) -> list[str]:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
return sorted(discussions.keys())
def _switch_discussion(self, name: str) -> None:
self._flush_disc_entries_to_project()
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if name not in discussions:
self._set_status(f"discussion not found: {name}")
return
self.active_discussion = name
self._track_discussion_active = False
disc_sec["active"] = name
disc_data = discussions[name]
with self._disc_entries_lock:
self.disc_entries = models.parse_history_entries(disc_data.get("history", []), self.disc_roles)
self._set_status(f"discussion: {name}")
def _flush_disc_entries_to_project(self) -> None:
history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries]
if self.active_track and self._track_discussion_active:
project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir)
return
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion())
disc_data["history"] = history_strings
disc_data["last_updated"] = project_manager.now_ts()
def _create_discussion(self, name: str) -> None:
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
if name in discussions:
self._set_status(f"discussion '{name}' already exists")
return
discussions[name] = project_manager.default_discussion()
self._switch_discussion(name)
def _rename_discussion(self, old_name: str, new_name: str) -> None:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if old_name not in discussions:
return
if new_name in discussions:
self._set_status(f"discussion '{new_name}' already exists")
return
discussions[new_name] = discussions.pop(old_name)
if self.active_discussion == old_name:
self.active_discussion = new_name
disc_sec["active"] = new_name
def _delete_discussion(self, name: str) -> None:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if len(discussions) <= 1:
self._set_status("cannot delete the last discussion")
return
if name not in discussions:
return
del discussions[name]
if self.active_discussion == name:
remaining = sorted(discussions.keys())
self._switch_discussion(remaining[0])
def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None:
if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
if payload is not None:
dlg._payload = payload
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_approval = None
if self._pending_mma_spawn:
spawn_dlg = self._pending_mma_spawn.get("dialog_container", [None])[0]
if spawn_dlg:
with spawn_dlg._condition:
spawn_dlg._approved = approved
spawn_dlg._abort = abort
if prompt is not None:
spawn_dlg._prompt = prompt
if context_md is not None:
spawn_dlg._context_md = context_md
spawn_dlg._done = True
spawn_dlg._condition.notify_all()
self._pending_mma_spawn = None
def _handle_approve_ask(self) -> None:
"""Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
def do_post() -> None:
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": request_id, "response": {"approved": True}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
threading.Thread(target=do_post, daemon=True).start()
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reject_ask(self) -> None:
"""Responds with rejection for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
def do_post() -> None:
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": request_id, "response": {"approved": False}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
threading.Thread(target=do_post, daemon=True).start()
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reset_session(self) -> None:
"""Logic for resetting the AI session and GUI state."""
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.disc_entries.clear()
# Clear history in ALL discussions to be safe
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
for d_name in discussions:
discussions[d_name]["history"] = []
self._set_status("session reset")
self.ai_response = ""
self.ui_ai_input = ""
self.ui_manual_approve = False
self.ui_auto_add_history = False
self._current_provider = "gemini"
self._current_model = "gemini-2.5-flash-lite"
ai_client.set_provider(self._current_provider, self._current_model)
with self._pending_history_adds_lock:
self._pending_history_adds.clear()
with self._api_event_queue_lock:
self._api_event_queue.clear()
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.clear()
def _handle_md_only(self) -> None:
"""Logic for the 'MD Only' action."""
def worker():
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self._set_status(f"md written: {path.name}")
# Refresh token budget metrics with CURRENT md
self._refresh_api_metrics({}, md_content=md)
except Exception as e:
self._set_status(f"error: {e}")
threading.Thread(target=worker, daemon=True).start()
def _handle_generate_send(self) -> None:
"""Logic for the 'Gen + Send' action."""
def worker():
sys.stderr.write("[DEBUG] _handle_generate_send worker started\n")
sys.stderr.flush()
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self._last_stable_md = stable_md
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
self._set_status("sending...")
user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir
sys.stderr.write(f"[DEBUG] _do_generate success. Prompt: {user_msg[:50]}...\n")
sys.stderr.flush()
# Prepare event payload
event_payload = events.UserRequestEvent(
prompt=user_msg,
stable_md=stable_md,
file_items=file_items,
disc_text=disc_text,
base_dir=base_dir
)
# Push to async queue
self.event_queue.put("user_request", event_payload)
sys.stderr.write("[DEBUG] Enqueued user_request event\n")
sys.stderr.flush()
except Exception as e:
import traceback
sys.stderr.write(f"[DEBUG] _do_generate ERROR: {e}\n{traceback.format_exc()}\n")
sys.stderr.flush()
self._set_status(f"generate error: {e}")
threading.Thread(target=worker, daemon=True).start()
def _recalculate_session_usage(self) -> None:
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0}
for entry in ai_client.get_comms_log():
if entry.get("kind") == "response" and "usage" in entry.get("payload", {}):
u = entry["payload"]["usage"]
for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]:
if k in usage:
usage[k] += u.get(k, 0) or 0
self.session_usage = usage
def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None:
if "latency" in payload:
self.session_usage["last_latency"] = payload["latency"]
self._recalculate_session_usage()
if md_content is not None:
stats = ai_client.get_token_stats(md_content)
# Ensure compatibility if keys are named differently
if "total_tokens" in stats and "estimated_prompt_tokens" not in stats:
stats["estimated_prompt_tokens"] = stats["total_tokens"]
self._token_stats = stats
cache_stats = payload.get("cache_stats")
if cache_stats:
count = cache_stats.get("cache_count", 0)
size_bytes = cache_stats.get("total_size_bytes", 0)
self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)"
def _flush_to_project(self) -> None:
proj = self.project
proj.setdefault("output", {})["output_dir"] = self.ui_output_dir
proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir
proj["files"]["paths"] = self.files
proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir
proj["screenshots"]["paths"] = self.screenshots
proj.setdefault("project", {})
proj["project"]["git_dir"] = self.ui_project_git_dir
proj["project"]["system_prompt"] = self.ui_project_system_prompt
proj["project"]["main_context"] = self.ui_project_main_context
proj["project"]["word_wrap"] = self.ui_word_wrap
proj["project"]["summary_only"] = self.ui_summary_only
proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms
proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls
proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path
proj.setdefault("agent", {}).setdefault("tools", {})
for t_name in models.AGENT_TOOL_NAMES:
proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True)
self._flush_disc_entries_to_project()
disc_sec = proj.setdefault("discussion", {})
disc_sec["roles"] = self.disc_roles
disc_sec["active"] = self.active_discussion
disc_sec["auto_add"] = self.ui_auto_add_history
# Save MMA State
mma_sec = proj.setdefault("mma", {})
mma_sec["epic"] = self.ui_epic_input
mma_sec["tier_models"] = {t: {"model": d["model"], "provider": d.get("provider", "gemini")} for t, d in self.mma_tier_usage.items()}
if self.active_track:
mma_sec["active_track"] = asdict(self.active_track)
else:
mma_sec["active_track"] = None
def _flush_to_config(self) -> None:
self.config["ai"] = {
"provider": self.current_provider,
"model": self.current_model,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"history_trunc_limit": self.history_trunc_limit,
}
self.config["ai"]["system_prompt"] = self.ui_global_system_prompt
self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path}
self.config["gui"] = {
"show_windows": self.show_windows,
"separate_message_panel": getattr(self, "ui_separate_message_panel", False),
"separate_response_panel": getattr(self, "ui_separate_response_panel", False),
"separate_tool_calls_panel": getattr(self, "ui_separate_tool_calls_panel", False),
}
theme.save_to_config(self.config)
def _do_generate(self) -> tuple[str, Path, list[dict[str, Any]], str, str]:
"""Returns (full_md, output_path, file_items, stable_md, discussion_text)."""
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
models.save_config(self.config)
track_id = self.active_track.id if self.active_track else None
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id)
full_md, path, file_items = aggregate.run(flat)
# Build stable markdown (no history) for Gemini caching
screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", "."))
screenshots = flat.get("screenshots", {}).get("paths", [])
summary_only = flat.get("project", {}).get("summary_only", False)
stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only)
# Build discussion history text separately
history = flat.get("discussion", {}).get("history", [])
discussion_text = aggregate.build_discussion_text(history)
return full_md, path, file_items, stable_md, discussion_text
def _cb_plan_epic(self) -> None:
def _bg_task() -> None:
sys.stderr.write("[DEBUG] _cb_plan_epic _bg_task started\n")
sys.stderr.flush()
try:
self._set_status("Planning Epic (Tier 1)...")
history = orchestrator_pm.get_track_history_summary()
sys.stderr.write(f"[DEBUG] History summary length: {len(history)}\n")
sys.stderr.flush()
proj = project_manager.load_project(self.active_project_path)
flat = project_manager.flat_config(self.project)
file_items = aggregate.build_file_items(Path(self.ui_files_base_dir), flat.get("files", {}).get("paths", []))
_t1_baseline = len(ai_client.get_comms_log())
tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history)
_t1_new = ai_client.get_comms_log()[_t1_baseline:]
_t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"]
_t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp)
_t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp)
def _push_t1_usage(i: int, o: int) -> None:
self.mma_tier_usage["Tier 1"]["input"] += i
self.mma_tier_usage["Tier 1"]["output"] += o
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "custom_callback",
"callback": _push_t1_usage,
"args": [_t1_in, _t1_out]
})
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": {
"text": json.dumps(tracks, indent=2),
"stream_id": "Tier 1",
"status": "Epic tracks generated."
}
})
self._pending_gui_tasks.append({
"action": "show_track_proposal",
"payload": tracks
})
except Exception as e:
self._set_status(f"Epic plan error: {e}")
print(f"ERROR in _cb_plan_epic background task: {e}")
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_accept_tracks(self) -> None:
self._show_track_proposal_modal = False
def _bg_task() -> None:
sys.stderr.write("[DEBUG] _cb_accept_tracks _bg_task started\n")
# Generate skeletons once
self._set_status("Phase 2: Generating skeletons for all tracks...")
sys.stderr.write("[DEBUG] Creating ASTParser...\n")
parser = ASTParser(language="python")
generated_skeletons = ""
try:
# Use a local copy of files to avoid concurrent modification issues
files_to_scan = list(self.files)
sys.stderr.write(f"[DEBUG] Scanning {len(files_to_scan)} files for skeletons...\n")
for i, file_path in enumerate(files_to_scan):
try:
self._set_status(f"Phase 2: Scanning files ({i+1}/{len(files_to_scan)})...")
abs_path = Path(self.ui_files_base_dir) / file_path
if abs_path.exists() and abs_path.suffix == ".py":
with open(abs_path, "r", encoding="utf-8") as f:
code = f.read()
generated_skeletons += f"\nFile: {file_path}\n{parser.get_skeleton(code)}\n"
except Exception as e:
sys.stderr.write(f"[DEBUG] Error parsing skeleton for {file_path}: {e}\n")
except Exception as e:
sys.stderr.write(f"[DEBUG] Error in scan loop: {e}\n")
self._set_status(f"Error generating skeletons: {e}")
return # Exit if skeleton generation fails
sys.stderr.write("[DEBUG] Skeleton generation complete. Starting tracks...\n")
# Now loop through tracks and call _start_track_logic with generated skeletons
total_tracks = len(self.proposed_tracks)
for i, track_data in enumerate(self.proposed_tracks):
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
self._set_status(f"Processing track {i+1} of {total_tracks}: '{title}'...")
self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons
sys.stderr.write("[DEBUG] All tracks started. Refreshing...\n")
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started
self._set_status(f"All {total_tracks} tracks accepted and execution started.")
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_start_track(self, user_data: Any = None) -> None:
if isinstance(user_data, str):
# If track_id is provided directly
track_id = user_data
# Ensure it's loaded as active
if not self.active_track or self.active_track.id != track_id:
self._cb_load_track(track_id)
if self.active_track:
# Use the active track object directly to start execution
self._set_mma_status("running")
engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode)
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id)
full_md, _, _ = aggregate.run(flat)
threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start()
self._set_status(f"Track '{self.active_track.description}' started.")
return
idx = 0
if isinstance(user_data, int):
idx = user_data
elif isinstance(user_data, dict):
idx = user_data.get("index", 0)
if 0 <= idx < len(self.proposed_tracks):
track_data = self.proposed_tracks[idx]
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start()
self._set_status(f"Track '{title}' started.")
def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None:
try:
goal = track_data.get("goal", "")
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
self._set_status(f"Phase 2: Generating tickets for {title}...")
skeletons = skeletons_str or "" # Use provided skeletons or empty
self._set_status("Phase 2: Calling Tech Lead...")
_t2_baseline = len(ai_client.get_comms_log())
raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons)
_t2_new = ai_client.get_comms_log()[_t2_baseline:]
_t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"]
_t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp)
_t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp)
def _push_t2_usage(i: int, o: int) -> None:
self.mma_tier_usage["Tier 2"]["input"] += i
self.mma_tier_usage["Tier 2"]["output"] += o
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "custom_callback",
"callback": _push_t2_usage,
"args": [_t2_in, _t2_out]
})
if not raw_tickets:
self._set_status(f"Error: No tickets generated for track: {title}")
print(f"Warning: No tickets generated for track: {title}")
return
self._set_status("Phase 2: Sorting tickets...")
try:
sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets)
except ValueError as e:
print(f"Dependency error in track '{title}': {e}")
sorted_tickets_data = raw_tickets
# 3. Create Track and Ticket objects
tickets = []
for t_data in sorted_tickets_data:
ticket = models.Ticket(
id=t_data["id"],
description=t_data.get("description") or t_data.get("goal", "No description"),
status=t_data.get("status", "todo"),
assigned_to=t_data.get("assigned_to", "unassigned"),
depends_on=t_data.get("depends_on", []),
step_mode=t_data.get("step_mode", False)
)
tickets.append(ticket)
track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}"
track = models.Track(id=track_id, description=title, tickets=tickets)
# Initialize track state in the filesystem
meta = models.Metadata(id=track_id, name=title, status="todo", created_at=datetime.now(), updated_at=datetime.now())
state = models.TrackState(metadata=meta, discussion=[], tasks=tickets)
project_manager.save_track_state(track_id, state, self.ui_files_base_dir)
# Add to memory and notify UI
self.tracks.append({"id": track_id, "title": title, "status": "todo"})
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({'action': 'refresh_from_project'})
# 4. Initialize ConductorEngine and run loop
engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode)
# Use current full markdown context for the track execution
track_id_param = track.id
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param)
full_md, _, _ = aggregate.run(flat)
# Start the engine in a separate thread
threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start()
except Exception as e:
self._set_status(f"Track start error: {e}")
print(f"ERROR in _start_track_logic: {e}")
def _cb_ticket_retry(self, ticket_id: str) -> None:
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'todo'
break
self.event_queue.put("mma_retry", {"ticket_id": ticket_id})
def _cb_ticket_skip(self, ticket_id: str) -> None:
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'skipped'
break
self.event_queue.put("mma_skip", {"ticket_id": ticket_id})
def _cb_run_conductor_setup(self) -> None:
base = paths.get_conductor_dir()
if not base.exists():
self.ui_conductor_setup_summary = f"Error: {base}/ directory not found."
return
files = list(base.glob("**/*"))
files = [f for f in files if f.is_file()]
summary = [f"Conductor Directory: {base.absolute()}"]
summary.append(f"Total Files: {len(files)}")
total_lines = 0
for f in files:
try:
with open(f, "r", encoding="utf-8") as fd:
lines = len(fd.readlines())
total_lines += lines
summary.append(f"- {f.relative_to(base)}: {lines} lines")
except Exception:
summary.append(f"- {f.relative_to(base)}: Error reading")
summary.append(f"Total Line Count: {total_lines}")
tracks_dir = base / "tracks"
if tracks_dir.exists():
tracks = [d for d in tracks_dir.iterdir() if d.is_dir()]
summary.append(f"Total Tracks Found: {len(tracks)}")
else:
summary.append("Tracks Directory: Not found")
self.ui_conductor_setup_summary = "\n".join(summary)
def _cb_create_track(self, name: str, desc: str, track_type: str) -> None:
if not name: return
date_suffix = datetime.now().strftime("%Y%m%d")
track_id = f"{name.lower().replace(' ', '_')}_{date_suffix}"
track_dir = paths.get_tracks_dir() / track_id
track_dir.mkdir(parents=True, exist_ok=True)
spec_file = track_dir / "spec.md"
with open(spec_file, "w", encoding="utf-8") as f:
f.write(f"# Specification: {name}\n\nType: {track_type}\n\nDescription: {desc}\n")
plan_file = track_dir / "plan.md"
with open(plan_file, "w", encoding="utf-8") as f:
f.write(f"# Implementation Plan: {name}\n\n- [ ] Task 1: Initialize\n")
meta_file = track_dir / "metadata.json"
with open(meta_file, "w", encoding="utf-8") as f:
json.dump({
"id": track_id,
"title": name,
"description": desc,
"type": track_type,
"status": "new",
"progress": 0.0
}, f, indent=1)
# Refresh tracks from disk
self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir)
def _push_mma_state_update(self) -> None:
if not self.active_track:
return
# Sync active_tickets (list of dicts) back to active_track.tickets (list of models.Ticket objects)
self.active_track.tickets = [models.Ticket.from_dict(t) for t in self.active_tickets]
# Save the state to disk
existing = project_manager.load_track_state(self.active_track.id, self.ui_files_base_dir)
meta = models.Metadata(
id=self.active_track.id,
name=self.active_track.description,
status=self.mma_status,
created_at=existing.metadata.created_at if existing else datetime.now(),
updated_at=datetime.now()
)
state = models.TrackState(
metadata=meta,
discussion=existing.discussion if existing else [],
tasks=self.active_track.tickets
)
project_manager.save_track_state(self.active_track.id, state, self.ui_files_base_dir)