import threading import time import sys import os import re from typing import Any, List, Dict, Optional, Callable from pathlib import Path import json import uuid import tomli_w import requests from dataclasses import asdict from datetime import datetime from tkinter import filedialog, Tk from fastapi import FastAPI, Depends, HTTPException from fastapi.security.api_key import APIKeyHeader from pydantic import BaseModel from src import events from src import paths from src import session_logger from src import project_manager from src import performance_monitor from src import models from src.file_cache import ASTParser from src import ai_client from src import shell_runner from src import mcp_client from src import aggregate from src import orchestrator_pm from src import conductor_tech_lead from src import multi_agent_conductor from src import theme def hide_tk_root() -> Tk: root = Tk() root.withdraw() root.wm_attributes("-topmost", True) return root def parse_symbols(text: str) -> list[str]: """ Finds all occurrences of '@SymbolName' in text and returns SymbolName. SymbolName can be a function, class, or method (e.g. @MyClass, @my_func, @MyClass.my_method). """ return re.findall(r"@([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)*)", text) class GenerateRequest(BaseModel): prompt: str auto_add_history: bool = True temperature: float | None = None max_tokens: int | None = None class ConfirmRequest(BaseModel): approved: bool script: Optional[str] = None class ConfirmDialog: def __init__(self, script: str, base_dir: str) -> None: self._uid = str(uuid.uuid4()) self._script = str(script) if script is not None else "" self._base_dir = str(base_dir) if base_dir is not None else "" self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: start_time = time.time() with self._condition: while not self._done: if time.time() - start_time > 120: return False, self._script self._condition.wait(timeout=0.1) return self._approved, self._script class MMAApprovalDialog: def __init__(self, ticket_id: str, payload: str) -> None: self._payload = payload self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: start_time = time.time() with self._condition: while not self._done: if time.time() - start_time > 120: return False, self._payload self._condition.wait(timeout=0.1) return self._approved, self._payload class MMASpawnApprovalDialog: def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None: self._prompt = prompt self._context_md = context_md self._condition = threading.Condition() self._done = False self._approved = False self._abort = False def wait(self) -> dict[str, Any]: start_time = time.time() with self._condition: while not self._done: if time.time() - start_time > 120: return {'approved': False, 'abort': True, 'prompt': self._prompt, 'context_md': self._context_md} self._condition.wait(timeout=0.1) return { 'approved': self._approved, 'abort': self._abort, 'prompt': self._prompt, 'context_md': self._context_md } class AppController: """ The headless controller for the Manual Slop application. Owns the application state and manages background services. """ PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek", "minimax"] def __init__(self): # Initialize locks first to avoid initialization order issues self._send_thread_lock: threading.Lock = threading.Lock() self._disc_entries_lock: threading.Lock = threading.Lock() self._pending_comms_lock: threading.Lock = threading.Lock() self._pending_tool_calls_lock: threading.Lock = threading.Lock() self._pending_history_adds_lock: threading.Lock = threading.Lock() self._pending_gui_tasks_lock: threading.Lock = threading.Lock() self._pending_dialog_lock: threading.Lock = threading.Lock() self._api_event_queue_lock: threading.Lock = threading.Lock() self.config: Dict[str, Any] = {} self.project: Dict[str, Any] = {} self.active_project_path: str = "" self.project_paths: List[str] = [] self.active_discussion: str = "main" self.disc_entries: List[Dict[str, Any]] = [] self.disc_roles: List[str] = [] self.files: List[str] = [] self.screenshots: List[str] = [] self.event_queue: events.SyncEventQueue = events.SyncEventQueue() self._loop_thread: Optional[threading.Thread] = None self.tracks: List[Dict[str, Any]] = [] self.active_track: Optional[models.Track] = None self.active_tickets: List[Dict[str, Any]] = [] self.mma_streams: Dict[str, str] = {} self._worker_status: Dict[str, str] = {} # stream_id -> "running" | "completed" | "failed" | "killed" self.MAX_STREAM_SIZE: int = 10 * 1024 # 10KB max per stream self._pending_patch_text: Optional[str] = None self._pending_patch_files: List[str] = [] self._show_patch_modal: bool = False self._patch_error_message: Optional[str] = None self.mma_status: str = "idle" self._tool_log: List[Dict[str, Any]] = [] self._tool_stats: Dict[str, Dict[str, Any]] = {} # {tool_name: {"count": 0, "total_time_ms": 0.0, "failures": 0}} self._cached_cache_stats: Dict[str, Any] = {} # Pre-computed cache stats for GUI self._cached_files: List[str] = [] self._token_history: List[Dict[str, Any]] = [] # Token usage over time [{"time": t, "input": n, "output": n, "model": s}, ...] self._session_start_time: float = time.time() # For calculating burn rate self._ticket_start_times: dict[str, float] = {} self._avg_ticket_time: float = 0.0 self._completed_ticket_count: int = 0 self._comms_log: List[Dict[str, Any]] = [] self.session_usage: Dict[str, Any] = { "input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0 } self.mma_tier_usage: Dict[str, Dict[str, Any]] = { "Tier 1": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-3.1-pro-preview"}, "Tier 2": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-3-flash-preview"}, "Tier 3": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite"}, "Tier 4": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite"}, } self.perf_monitor: performance_monitor.PerformanceMonitor = performance_monitor.PerformanceMonitor() self._pending_gui_tasks: List[Dict[str, Any]] = [] self._api_event_queue: List[Dict[str, Any]] = [] # Pending dialogs state moved from App self._pending_dialog: Optional[ConfirmDialog] = None self._pending_dialog_open: bool = False self._pending_actions: Dict[str, ConfirmDialog] = {} self._pending_ask_dialog: bool = False # AI settings state self._current_provider: str = "gemini" self._current_model: str = "gemini-2.5-flash-lite" self.temperature: float = 0.0 self.max_tokens: int = 8192 self.history_trunc_limit: int = 8000 # UI-related state moved to controller self.ui_ai_input: str = "" self.ui_disc_new_name_input: str = "" self.ui_disc_new_role_input: str = "" self.ui_epic_input: str = "" self.ui_new_track_name: str = "" self.ui_new_track_desc: str = "" self.ui_new_track_type: str = "feature" self.ui_conductor_setup_summary: str = "" self.ui_last_script_text: str = "" self.ui_last_script_output: str = "" self.ui_new_ticket_id: str = "" self.ui_new_ticket_desc: str = "" self.ui_new_ticket_target: str = "" self.ui_new_ticket_deps: str = "" self.ui_output_dir: str = "" self.ui_files_base_dir: str = "" self.ui_shots_base_dir: str = "" self.ui_project_git_dir: str = "" self.ui_project_main_context: str = "" self.ui_project_system_prompt: str = "" self.ui_gemini_cli_path: str = "gemini" self.ui_word_wrap: bool = True self.ui_summary_only: bool = False self.ui_auto_add_history: bool = False self.ui_global_system_prompt: str = "" self.ui_agent_tools: Dict[str, bool] = {} self.available_models: List[str] = [] self.all_available_models: Dict[str, List[str]] = {} # provider -> list of models self._autofocus_response_tab = False self.proposed_tracks: List[Dict[str, Any]] = [] self._show_track_proposal_modal: bool = False self.ai_status: str = 'idle' self.ai_response: str = '' self.last_md: str = '' self.last_md_path: Optional[Path] = None self.last_file_items: List[Any] = [] self.send_thread: Optional[threading.Thread] = None self.models_thread: Optional[threading.Thread] = None self.show_windows: Dict[str, bool] = {} self.show_script_output: bool = False self.show_text_viewer: bool = False self.text_viewer_title: str = '' self.text_viewer_content: str = '' self._pending_comms: List[Dict[str, Any]] = [] self._pending_tool_calls: List[Dict[str, Any]] = [] self._pending_history_adds: List[Dict[str, Any]] = [] self.perf_history: Dict[str, List[float]] = {'frame_time': [0.0]*100, 'fps': [0.0]*100, 'cpu': [0.0]*100, 'input_lag': [0.0]*100} self._perf_last_update: float = 0.0 self._autosave_interval: float = 60.0 self._last_autosave: float = time.time() # More state moved from App self._ask_dialog_open: bool = False self._ask_request_id: Optional[str] = None self._ask_tool_data: Optional[Dict[str, Any]] = None self.mma_step_mode: bool = False self.active_tier: Optional[str] = None self.ui_focus_agent: Optional[str] = None self._pending_mma_approval: Optional[Dict[str, Any]] = None self._mma_approval_open: bool = False self._mma_approval_edit_mode: bool = False self._mma_approval_payload: str = "" self._pending_mma_spawn: Optional[Dict[str, Any]] = None self._mma_spawn_open: bool = False self._mma_spawn_edit_mode: bool = False self._mma_spawn_prompt: str = '' self._mma_spawn_context: str = '' self._trigger_blink: bool = False self._is_blinking: bool = False self._blink_start_time: float = 0.0 self._trigger_script_blink: bool = False self._is_script_blinking: bool = False self._script_blink_start_time: float = 0.0 self._scroll_disc_to_bottom: bool = False self._scroll_comms_to_bottom: bool = False self._scroll_tool_calls_to_bottom: bool = False self._gemini_cache_text: str = "" self._last_stable_md: str = '' self._token_stats: Dict[str, Any] = {} self._token_stats_dirty: bool = False self.ui_disc_truncate_pairs: int = 2 self.ui_auto_scroll_comms: bool = True self.ui_auto_scroll_tool_calls: bool = True self._show_add_ticket_form: bool = False self._track_discussion_active: bool = False self._tier_stream_last_len: Dict[str, int] = {} self.is_viewing_prior_session: bool = False self.prior_session_entries: List[Dict[str, Any]] = [] self.test_hooks_enabled: bool = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1") self.ui_manual_approve: bool = False # Injection state self._inject_file_path: str = "" self._inject_mode: str = "skeleton" self._inject_preview: str = "" self._show_inject_modal: bool = False self._settable_fields: Dict[str, str] = { 'ai_input': 'ui_ai_input', 'project_git_dir': 'ui_project_git_dir', 'auto_add_history': 'ui_auto_add_history', 'disc_new_name_input': 'ui_disc_new_name_input', 'project_main_context': 'ui_project_main_context', 'gcli_path': 'ui_gemini_cli_path', 'output_dir': 'ui_output_dir', 'files_base_dir': 'ui_files_base_dir', 'ai_status': 'ai_status', 'ai_response': 'ai_response', 'active_discussion': 'active_discussion', 'current_provider': 'current_provider', 'current_model': 'current_model', 'token_budget_pct': '_token_budget_pct', 'token_budget_current': '_token_budget_current', 'token_budget_label': '_token_budget_label', 'show_confirm_modal': 'show_confirm_modal', 'mma_epic_input': 'ui_epic_input', 'mma_status': 'mma_status', 'mma_active_tier': 'active_tier', 'ui_new_track_name': 'ui_new_track_name', 'ui_new_track_desc': 'ui_new_track_desc', 'manual_approve': 'ui_manual_approve', 'inject_file_path': '_inject_file_path', 'inject_mode': '_inject_mode', 'show_inject_modal': '_show_inject_modal' } self._gettable_fields = dict(self._settable_fields) self._gettable_fields.update({ 'ui_focus_agent': 'ui_focus_agent', 'active_discussion': 'active_discussion', '_track_discussion_active': '_track_discussion_active', 'proposed_tracks': 'proposed_tracks', 'mma_streams': 'mma_streams', '_worker_status': '_worker_status', 'active_track': 'active_track', 'active_tickets': 'active_tickets', 'tracks': 'tracks', 'thinking_indicator': 'thinking_indicator', 'operations_live_indicator': 'operations_live_indicator', 'prior_session_indicator': 'prior_session_indicator', '_show_patch_modal': '_show_patch_modal', '_pending_patch_text': '_pending_patch_text', '_pending_patch_files': '_pending_patch_files', '_inject_file_path': '_inject_file_path', '_inject_mode': '_inject_mode', '_inject_preview': '_inject_preview', '_show_inject_modal': '_show_inject_modal' }) self.perf_monitor = performance_monitor.get_monitor() self._perf_profiling_enabled = False self._init_actions() @property def perf_profiling_enabled(self) -> bool: return self._perf_profiling_enabled @perf_profiling_enabled.setter def perf_profiling_enabled(self, value: bool) -> None: self._perf_profiling_enabled = value if hasattr(self, 'perf_monitor'): self.perf_monitor.enabled = value def _update_inject_preview(self) -> None: """Updates the preview content based on the selected file and injection mode.""" if not self._inject_file_path: self._inject_preview = "" return target_path = self._inject_file_path if not os.path.isabs(target_path): target_path = os.path.join(self.ui_files_base_dir, target_path) if not os.path.exists(target_path): self._inject_preview = "" return try: with open(target_path, "r", encoding="utf-8") as f: content = f.read() if self._inject_mode == "skeleton" and target_path.endswith(".py"): parser = ASTParser("python") preview = parser.get_skeleton(content) else: preview = content lines = preview.splitlines() if len(lines) > 500: preview = "\n".join(lines[:500]) + "\n... (truncated)" self._inject_preview = preview except Exception as e: self._inject_preview = f"Error reading file: {e}" @property def thinking_indicator(self) -> bool: return self.ai_status in ("sending...", "streaming...") @property def operations_live_indicator(self) -> bool: return not self.is_viewing_prior_session @property def prior_session_indicator(self) -> bool: return self.is_viewing_prior_session def _init_actions(self) -> None: # Set up state-related action maps self._clickable_actions: dict[str, Callable[..., Any]] = { 'btn_reset': self._handle_reset_session, 'btn_gen_send': self._handle_generate_send, 'btn_md_only': self._handle_md_only, 'btn_approve_script': self._handle_approve_script, 'btn_reject_script': self._handle_reject_script, 'btn_project_save': self._cb_project_save, 'btn_disc_create': self._cb_disc_create, 'btn_mma_plan_epic': self._cb_plan_epic, 'btn_mma_accept_tracks': self._cb_accept_tracks, 'btn_mma_start_track': self._cb_start_track, 'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type), 'btn_approve_tool': self._handle_approve_ask, 'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True), 'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True), 'btn_prune_logs': self.cb_prune_logs, } self._predefined_callbacks: dict[str, Callable[..., Any]] = { '_test_callback_func_write_to_file': self._test_callback_func_write_to_file, '_set_env_var': lambda k, v: os.environ.update({k: v}) } def _update_gcli_adapter(self, path: str) -> None: sys.stderr.write(f"[DEBUG] _update_gcli_adapter called with: {path}\n") sys.stderr.flush() if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=str(path)) else: ai_client._gemini_cli_adapter.binary_path = str(path) def _set_status(self, status: str) -> None: """Thread-safe update of ai_status via the GUI task queue.""" with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "set_ai_status", "payload": status }) def _set_mma_status(self, status: str) -> None: """Thread-safe update of mma_status via the GUI task queue.""" with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "set_mma_status", "payload": status }) def _process_pending_gui_tasks(self) -> None: if not self._pending_gui_tasks: return sys.stderr.write(f"[DEBUG] _process_pending_gui_tasks: processing {len(self._pending_gui_tasks)} tasks\n") sys.stderr.flush() with self._pending_gui_tasks_lock: tasks = self._pending_gui_tasks[:] self._pending_gui_tasks.clear() for task in tasks: try: action = task.get("action") sys.stderr.write(f"[DEBUG] Processing GUI task: action={action}\n") sys.stderr.flush() if action: session_logger.log_api_hook("PROCESS_TASK", action, str(task)) # ... if action == "refresh_api_metrics": self._refresh_api_metrics(task.get("payload", {}), md_content=self.last_md or None) elif action == "set_ai_status": self.ai_status = task.get("payload", "") sys.stderr.write(f"[DEBUG] Updated ai_status via task to: {self.ai_status}\n") sys.stderr.flush() elif action == "set_mma_status": self.mma_status = task.get("payload", "") elif action == "handle_ai_response": payload = task.get("payload", {}) text = payload.get("text", "") stream_id = payload.get("stream_id") is_streaming = payload.get("status") == "streaming..." if stream_id: if is_streaming: if stream_id not in self.mma_streams: self.mma_streams[stream_id] = "" if stream_id not in self._worker_status: self._worker_status[stream_id] = "running" self.mma_streams[stream_id] += text if len(self.mma_streams[stream_id]) > self.MAX_STREAM_SIZE: self.mma_streams[stream_id] = self.mma_streams[stream_id][-self.MAX_STREAM_SIZE:] else: self.mma_streams[stream_id] = text if stream_id in self._worker_status and self._worker_status[stream_id] == "running": self._worker_status[stream_id] = "completed" if stream_id == "Tier 1": if "status" in payload: self.ai_status = payload["status"] else: if is_streaming: self.ai_response += text else: self.ai_response = text self.ai_status = payload.get("status", "done") sys.stderr.write(f"[DEBUG] Updated ai_status to: {self.ai_status}\n") sys.stderr.flush() self._trigger_blink = True if not stream_id: self._token_stats_dirty = True if not is_streaming: self._autofocus_response_tab = True # ONLY add to history when turn is complete if self.ui_auto_add_history and not stream_id and not is_streaming: role = payload.get("role", "AI") with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": role, "content": self.ai_response, "collapsed": True, "ts": project_manager.now_ts() }) elif action in ("mma_stream", "mma_stream_append"): # Some events might have these at top level, some in a 'payload' dict stream_id = task.get("stream_id") or task.get("payload", {}).get("stream_id") text = task.get("text") or task.get("payload", {}).get("text", "") if stream_id: if stream_id not in self.mma_streams: self.mma_streams[stream_id] = "" self.mma_streams[stream_id] += text elif action == "show_track_proposal": self.proposed_tracks = task.get("payload", []) self._show_track_proposal_modal = True elif action == "mma_state_update": # Handle both internal (nested) and hook-server (flattened) payloads p = task.get("payload") if not isinstance(p, dict): p = task # Fallback to task itself if payload is missing or wrong type sys.stderr.write(f"[DEBUG] mma_state_update: status={p.get('status')} active_tier={p.get('active_tier')}\n") sys.stderr.flush() self.mma_status = p.get("status", self.mma_status) self.active_tier = p.get("active_tier", self.active_tier) # Preserve existing model/provider config if not explicitly in payload new_usage = p.get("tier_usage", {}) for tier, data in new_usage.items(): if tier in self.mma_tier_usage: # Update usage counts but keep selected model/provider if not in update self.mma_tier_usage[tier]["input"] = data.get("input", self.mma_tier_usage[tier]["input"]) self.mma_tier_usage[tier]["output"] = data.get("output", self.mma_tier_usage[tier]["output"]) if "model" in data: self.mma_tier_usage[tier]["model"] = data["model"] if "provider" in data: self.mma_tier_usage[tier]["provider"] = data["provider"] else: self.mma_tier_usage[tier] = data self.active_tickets = p.get("tickets", []) track_data = p.get("track") if track_data: tickets = [] for t_data in self.active_tickets: if isinstance(t_data, models.Ticket): tickets.append(t_data) else: # Map 'goal' from Godot format to 'description' if needed if "goal" in t_data and "description" not in t_data: t_data["description"] = t_data["goal"] tickets.append(models.Ticket.from_dict(t_data)) self.active_track = models.Track( id=track_data.get("id"), description=track_data.get("title", ""), tickets=tickets ) elif action == "set_value": item = task.get("item") value = task.get("value") sys.stderr.write(f"[DEBUG] Processing set_value: {item}={value}\n") sys.stderr.flush() if item in self._settable_fields: attr_name = self._settable_fields[item] setattr(self, attr_name, value) sys.stderr.write(f"[DEBUG] Set {attr_name} to {value}\n") sys.stderr.flush() if item == "gcli_path": self._update_gcli_adapter(str(value)) elif action == "click": item = task.get("item") user_data = task.get("user_data") sys.stderr.write(f"[DEBUG] Processing click: {item} (user_data={user_data})\n") sys.stderr.flush() if item == "btn_project_new_automated": self._cb_new_project_automated(user_data) elif item == "btn_mma_load_track": self._cb_load_track(str(user_data or "")) elif item in self._clickable_actions: import inspect func = self._clickable_actions[item] try: sig = inspect.signature(func) if 'user_data' in sig.parameters: func(user_data=user_data) else: func() except Exception: func() elif action == "select_list_item": item = task.get("listbox", task.get("item")) value = task.get("item_value", task.get("value")) if item == "disc_listbox": self._switch_discussion(str(value or "")) elif task.get("type") == "ask": self._pending_ask_dialog = True self._ask_request_id = task.get("request_id") self._ask_tool_data = task.get("data", {}) elif action == "clear_ask": if self._ask_request_id == task.get("request_id"): self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None elif action == "custom_callback": cb = task.get("callback") args = task.get("args", []) if callable(cb): try: cb(*args) except Exception as e: print(f"Error in direct custom callback: {e}") elif cb in self._predefined_callbacks: self._predefined_callbacks[cb](*args) elif action == "mma_step_approval": dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or "")) self._pending_mma_approval = task if "dialog_container" in task: task["dialog_container"][0] = dlg elif action == 'refresh_from_project': self._refresh_from_project() elif action == "show_patch_modal": self._pending_patch_text = task.get("patch_text", "") self._pending_patch_files = task.get("file_paths", []) self._show_patch_modal = True elif action == "hide_patch_modal": self._show_patch_modal = False self._pending_patch_text = None self._pending_patch_files = [] elif action == "mma_spawn_approval": spawn_dlg = MMASpawnApprovalDialog( str(task.get("ticket_id") or ""), str(task.get("role") or ""), str(task.get("prompt") or ""), str(task.get("context_md") or "") ) self._pending_mma_spawn = task self._mma_spawn_prompt = task.get("prompt", "") self._mma_spawn_context = task.get("context_md", "") self._mma_spawn_open = True self._mma_spawn_edit_mode = False if "dialog_container" in task: task["dialog_container"][0] = spawn_dlg elif action == "ticket_started": payload = task.get("payload", {}) ticket_id = payload.get("ticket_id") start_time = payload.get("timestamp") if ticket_id and start_time: self._ticket_start_times[ticket_id] = start_time elif action == "ticket_completed": payload = task.get("payload", {}) ticket_id = payload.get("ticket_id") end_time = payload.get("timestamp") if ticket_id and end_time and ticket_id in self._ticket_start_times: start_time = self._ticket_start_times.pop(ticket_id) elapsed = end_time - start_time self._completed_ticket_count += 1 self._avg_ticket_time = ((self._avg_ticket_time * (self._completed_ticket_count - 1)) + elapsed) / self._completed_ticket_count except Exception as e: import traceback sys.stderr.write(f"[DEBUG] Error executing GUI task: {e}\n{traceback.format_exc()}\n") sys.stderr.flush() print(f"Error executing GUI task: {e}") def _process_pending_history_adds(self) -> None: """Synchronizes pending history entries to the active discussion and project state.""" with self._pending_history_adds_lock: items = self._pending_history_adds[:] self._pending_history_adds.clear() if not items: return self._scroll_disc_to_bottom = True for item in items: item.get("role", "unknown") if item.get("role") and item["role"] not in self.disc_roles: self.disc_roles.append(item["role"]) disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) disc_data = discussions.get(self.active_discussion) if disc_data is not None: if item.get("disc_title", self.active_discussion) == self.active_discussion: if self.disc_entries is not disc_data.get("history"): if "history" not in disc_data: disc_data["history"] = [] disc_data["history"].append(project_manager.entry_to_str(item)) disc_data["last_updated"] = project_manager.now_ts() with self._disc_entries_lock: self.disc_entries.append(item) def _process_pending_tool_calls(self) -> bool: """Drains pending tool calls into the tool log. Returns True if any were processed.""" with self._pending_tool_calls_lock: items = self._pending_tool_calls[:] self._pending_tool_calls.clear() if not items: return False for item in items: self._append_tool_log( item.get("script", ""), item.get("result", ""), source_tier=item.get("source_tier") ) return True def _test_callback_func_write_to_file(self, data: str) -> None: """A dummy function that a custom_callback would execute for testing.""" callback_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "tests", "artifacts", "temp_callback_output.txt") os.makedirs(os.path.dirname(callback_path), exist_ok=True) with open(callback_path, "w") as f: f.write(data) def _handle_approve_script(self, user_data=None) -> None: """Approves the currently pending PowerShell script.""" with self._pending_dialog_lock: dlg = self._pending_dialog if dlg: with dlg._condition: dlg._approved = True dlg._done = True dlg._condition.notify_all() self._pending_dialog = None def _handle_reject_script(self, user_data=None) -> None: """Rejects the currently pending PowerShell script.""" with self._pending_dialog_lock: dlg = self._pending_dialog if dlg: with dlg._condition: dlg._approved = False dlg._done = True dlg._condition.notify_all() self._pending_dialog = None def init_state(self): """Initializes the application state from configurations.""" self.config = models.load_config() ai_cfg = self.config.get("ai", {}) self._current_provider = ai_cfg.get("provider", "gemini") self._current_model = ai_cfg.get("model", "gemini-2.5-flash-lite") self.temperature = ai_cfg.get("temperature", 0.0) self.max_tokens = ai_cfg.get("max_tokens", 8192) self.history_trunc_limit = ai_cfg.get("history_trunc_limit", 8000) projects_cfg = self.config.get("projects", {}) self.project_paths = list(projects_cfg.get("paths", [])) self.active_project_path = projects_cfg.get("active", "") self._load_active_project() # Deserialize FileItems in files.paths raw_paths = self.project.get("files", {}).get("paths", []) self.files = [] for p in raw_paths: if isinstance(p, models.FileItem): self.files.append(p) elif isinstance(p, dict): self.files.append(models.FileItem.from_dict(p)) else: self.files.append(models.FileItem(path=str(p))) self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles = list(disc_sec.get("roles", ["User", "AI", "Vendor API", "System", "Reasoning", "Context"])) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) with self._disc_entries_lock: self.disc_entries = models.parse_history_entries(disc_data.get("history", []), self.disc_roles) # UI state self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".") proj_meta = self.project.get("project", {}) self.ui_project_git_dir = proj_meta.get("git_dir", "") self.ui_project_main_context = proj_meta.get("main_context", "") self.ui_project_system_prompt = proj_meta.get("system_prompt", "") self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini") self._update_gcli_adapter(self.ui_gemini_cli_path) self.ui_word_wrap = proj_meta.get("word_wrap", True) self.ui_summary_only = proj_meta.get("summary_only", False) self.ui_auto_add_history = disc_sec.get("auto_add", False) self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "") _default_windows = { "Context Hub": True, "Files & Media": True, "AI Settings": True, "MMA Dashboard": True, "Tier 1: Strategy": True, "Tier 2: Tech Lead": True, "Tier 3: Workers": True, "Tier 4: QA": True, "Discussion Hub": True, "Operations Hub": True, "Message": False, "Response": False, "Tool Calls": False, "Theme": True, "Log Management": False, "Diagnostics": False, } saved = self.config.get("gui", {}).get("show_windows", {}) self.show_windows = {k: saved.get(k, v) for k, v in _default_windows.items()} agent_tools_cfg = self.project.get("agent", {}).get("tools", {}) self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in models.AGENT_TOOL_NAMES} label = self.project.get("project", {}).get("name", "") session_logger.open_session(label=label) def cb_load_prior_log(self) -> None: root = hide_tk_root() path = filedialog.askopenfilename( title="Load Session Log", initialdir=str(paths.get_logs_dir()), filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")] ) root.destroy() if not path: return entries = [] try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: try: entries.append(json.loads(line)) except json.JSONDecodeError: continue except Exception as e: self._set_status(f"log load error: {e}") return self.prior_session_entries = entries self.is_viewing_prior_session = True self._set_status(f"viewing prior session: {Path(path).name} ({len(entries)} entries)") def cb_prune_logs(self) -> None: """Manually triggers the log pruning process with aggressive thresholds.""" self._set_status("Manual prune started (Age > 0d, Size < 100KB)...") def run_manual_prune() -> None: try: from src import log_registry from src import log_pruner registry = log_registry.LogRegistry(str(paths.get_logs_dir() / "log_registry.toml")) pruner = log_pruner.LogPruner(registry, str(paths.get_logs_dir())) # Aggressive: Prune anything not whitelisted, even if just created, if under 100KB # Note: max_age_days=0 means cutoff is NOW. pruner.prune(max_age_days=0, min_size_kb=100) self._set_status("Manual prune complete.") except Exception as e: self._set_status(f"Manual prune error: {e}") print(f"Error during manual log pruning: {e}") thread = threading.Thread(target=run_manual_prune, daemon=True) thread.start() def _load_active_project(self) -> None: """Loads the active project configuration, with fallbacks.""" if self.active_project_path and Path(self.active_project_path).exists(): try: self.project = project_manager.load_project(self.active_project_path) return except Exception as e: print(f"Failed to load project {self.active_project_path}: {e}") for pp in self.project_paths: if Path(pp).exists(): try: self.project = project_manager.load_project(pp) self.active_project_path = pp return except Exception: continue self.project = project_manager.migrate_from_legacy_config(self.config) name = self.project.get("project", {}).get("name", "project") fallback_path = f"{name}.toml" project_manager.save_project(self.project, fallback_path) self.active_project_path = fallback_path if fallback_path not in self.project_paths: self.project_paths.append(fallback_path) def _prune_old_logs(self) -> None: """Asynchronously prunes old insignificant logs on startup.""" def run_prune() -> None: try: from src import log_registry from src import log_pruner registry = log_registry.LogRegistry(str(paths.get_logs_dir() / "log_registry.toml")) pruner = log_pruner.LogPruner(registry, str(paths.get_logs_dir())) pruner.prune() except Exception as e: print(f"Error during log pruning: {e}") thread = threading.Thread(target=run_prune, daemon=True) thread.start() def _fetch_models(self, provider: str) -> None: self._set_status("fetching models...") def do_fetch() -> None: try: for p in self.PROVIDERS: try: self.all_available_models[p] = ai_client.list_models(p) except Exception as e: sys.stderr.write(f"[DEBUG] Error fetching models for {p}: {e}\n") self.all_available_models[p] = [] models_list = self.all_available_models.get(provider, []) self.available_models = models_list if self.current_model not in models_list and models_list: self.current_model = models_list[0] ai_client.set_provider(self._current_provider, self.current_model) self._set_status(f"models loaded: {len(models_list)}") except Exception as e: self._set_status(f"model fetch error: {e}") self.models_thread = threading.Thread(target=do_fetch, daemon=True) self.models_thread.start() def start_services(self, app: Any = None): """Starts background threads.""" sys.stderr.write("[DEBUG] AppController.start_services called\n") sys.stderr.flush() self._prune_old_logs() self._init_ai_and_hooks(app) self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) self._loop_thread.start() sys.stderr.write(f"[DEBUG] _loop_thread started: {self._loop_thread.ident}\n") sys.stderr.flush() def shutdown(self) -> None: """Stops background threads and cleans up resources.""" from src import ai_client ai_client.cleanup() if hasattr(self, 'hook_server') and self.hook_server: self.hook_server.stop() self.event_queue.put("shutdown", None) if self._loop_thread and self._loop_thread.is_alive(): self._loop_thread.join(timeout=2.0) def _init_ai_and_hooks(self, app: Any = None) -> None: from src import api_hooks ai_client.set_provider(self._current_provider, self._current_model) if self._current_provider == "gemini_cli": if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) else: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path ai_client.confirm_and_run_callback = self._confirm_and_run ai_client.comms_log_callback = self._on_comms_entry ai_client.tool_log_callback = self._on_tool_log mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics self.perf_monitor.alert_callback = self._on_performance_alert ai_client.events.on("request_start", lambda **kw: self._on_api_event("request_start", **kw)) ai_client.events.on("response_received", lambda **kw: self._on_api_event("response_received", **kw)) ai_client.events.on("tool_execution", lambda **kw: self._on_api_event("tool_execution", **kw)) self.hook_server = api_hooks.HookServer(app if app else self) self.hook_server.start() def _run_event_loop(self): """Internal loop runner.""" def queue_fallback() -> None: while True: try: if hasattr(self, '_process_pending_gui_tasks'): self._process_pending_gui_tasks() if hasattr(self, '_process_pending_history_adds'): self._process_pending_history_adds() except: pass time.sleep(0.1) fallback_thread = threading.Thread(target=queue_fallback, daemon=True) fallback_thread.start() self._process_event_queue() def _process_event_queue(self) -> None: """Listens for and processes events from the SyncEventQueue.""" sys.stderr.write("[DEBUG] _process_event_queue entered\n") sys.stderr.flush() while True: event_name, payload = self.event_queue.get() sys.stderr.write(f"[DEBUG] _process_event_queue got event: {event_name} with payload: {str(payload)[:100]}\n") sys.stderr.flush() if event_name == "shutdown": break if event_name == "user_request": threading.Thread(target=self._handle_request_event, args=(payload,), daemon=True).start() elif event_name == "gui_task": with self._pending_gui_tasks_lock: # Directly append the task from the hook server. # It already contains 'action' and any necessary fields. self._pending_gui_tasks.append(payload) elif event_name == "mma_state_update": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "mma_state_update", "payload": payload }) elif event_name == "mma_stream": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "mma_stream_append", "payload": payload }) elif event_name in ("mma_spawn_approval", "mma_step_approval"): with self._pending_gui_tasks_lock: # These payloads already contain the 'action' field self._pending_gui_tasks.append(payload) elif event_name == "response": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": payload }) if self.test_hooks_enabled: with self._api_event_queue_lock: self._api_event_queue.append({"type": "response", "payload": payload}) elif event_name == "ticket_started": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "ticket_started", "payload": payload }) elif event_name == "ticket_completed": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "ticket_completed", "payload": payload }) def _handle_request_event(self, event: events.UserRequestEvent) -> None: """Processes a UserRequestEvent by calling the AI client.""" ai_client.set_current_tier(None) # Ensure main discussion is untagged # Clear response area for new turn self.ai_response = "" csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit) ai_client.set_agent_tools(self.ui_agent_tools) # Force update adapter path right before send to bypass potential duplication issues self._update_gcli_adapter(self.ui_gemini_cli_path) sys.stderr.write(f"[DEBUG] Calling ai_client.send with provider={ai_client.get_provider()}, model={self.current_model}, gcli_path={self.ui_gemini_cli_path}\n") sys.stderr.flush() try: resp = ai_client.send( event.stable_md, event.prompt, event.base_dir, event.file_items, event.disc_text, stream=True, stream_callback=lambda text: self._on_ai_stream(text), pre_tool_callback=self._confirm_and_run, qa_callback=ai_client.run_tier4_analysis, patch_callback=ai_client.run_tier4_patch_callback ) self.event_queue.put("response", {"text": resp, "status": "done", "role": "AI"}) except ai_client.ProviderError as e: sys.stderr.write(f"[DEBUG] _handle_request_event ai_client.ProviderError: {e.ui_message()}\n") sys.stderr.flush() self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}) except Exception as e: import traceback sys.stderr.write(f"[DEBUG] _handle_request_event ERROR: {e}\n{traceback.format_exc()}\n") sys.stderr.flush() self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}) def _on_ai_stream(self, text: str) -> None: """Handles streaming text from the AI.""" self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"}) def _on_comms_entry(self, entry: Dict[str, Any]) -> None: session_logger.log_comms(entry) entry["local_ts"] = time.time() kind = entry.get("kind") payload = entry.get("payload", {}) if kind == "response" and "usage" in payload: u = payload["usage"] for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: if k in u: self.session_usage[k] += u.get(k, 0) or 0 input_t = u.get("input_tokens", 0) output_t = u.get("output_tokens", 0) model = payload.get("model", "unknown") self._token_history.append({ "time": time.time(), "input": input_t, "output": output_t, "model": model }) if kind == "request": if self.ui_auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "User", "content": payload.get("message", ""), "collapsed": payload.get("collapsed", False), "ts": entry.get("ts", project_manager.now_ts()) }) if kind in ("tool_result", "tool_call"): role = "Tool" if kind == "tool_result" else "Vendor API" content = "" if kind == "tool_result": content = payload.get("output", "") else: content = payload.get("script") or payload.get("args") or payload.get("message", "") if isinstance(content, dict): content = json.dumps(content, indent=1) with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": role, "content": f"[{kind.upper().replace('_', ' ')}]\n{content}", "collapsed": True, "ts": entry.get("ts", project_manager.now_ts()) }) if kind == "history_add": payload = entry.get("payload", {}) with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": payload.get("role", "AI"), "content": payload.get("content", ""), "collapsed": payload.get("collapsed", False), "ts": entry.get("ts", project_manager.now_ts()) }) return with self._pending_comms_lock: self._pending_comms.append(entry) def _on_tool_log(self, script: str, result: str) -> None: session_logger.log_tool_call(script, result, None) source_tier = ai_client.get_current_tier() with self._pending_tool_calls_lock: self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) def _on_api_event(self, event_name: str = "generic_event", **kwargs: Any) -> None: payload = kwargs.get("payload", {}) # Push to background event queue, NOT GUI queue self.event_queue.put("refresh_api_metrics", payload) if self.test_hooks_enabled: with self._api_event_queue_lock: self._api_event_queue.append({"type": event_name, "payload": payload}) def _on_performance_alert(self, message: str) -> None: alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load." with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "System", "content": alert_text, "ts": project_manager.now_ts() }) def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]: sys.stderr.write(f"[DEBUG] _confirm_and_run called. test_hooks={self.test_hooks_enabled}, manual_approve={getattr(self, 'ui_manual_approve', False)}\n") sys.stderr.flush() if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): sys.stderr.write("[DEBUG] Auto-approving script.\n") sys.stderr.flush() self._set_status("running powershell...") output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback) self._set_status("powershell done, awaiting AI...") return output sys.stderr.write("[DEBUG] Creating ConfirmDialog.\n") sys.stderr.flush() dialog = ConfirmDialog(script, base_dir) is_headless = "--headless" in sys.argv if is_headless: with self._pending_dialog_lock: self._pending_actions[dialog._uid] = dialog else: with self._pending_dialog_lock: self._pending_dialog = dialog if self.test_hooks_enabled and hasattr(self, '_api_event_queue'): with self._api_event_queue_lock: self._api_event_queue.append({ "type": "script_confirmation_required", "action_id": dialog._uid, "script": str(script), "base_dir": str(base_dir), "ts": time.time() }) sys.stderr.write(f"[DEBUG] Appended script_confirmation_required to _api_event_queue. ID={dialog._uid}\n") sys.stderr.flush() sys.stderr.write(f"[DEBUG] Waiting for dialog ID={dialog._uid}...\n") sys.stderr.flush() approved, final_script = dialog.wait() sys.stderr.write(f"[DEBUG] Dialog ID={dialog._uid} finished wait. approved={approved}\n") sys.stderr.flush() if is_headless: with self._pending_dialog_lock: if dialog._uid in self._pending_actions: del self._pending_actions[dialog._uid] if not approved: self._append_tool_log(final_script, "REJECTED by user") return None self._set_status("running powershell...") output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback) self._append_tool_log(final_script, output) self._set_status("powershell done, awaiting AI...") return output def _append_tool_log(self, script: str, result: str, source_tier: str | None = None, elapsed_ms: float = 0.0) -> None: self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) tool_name = self._extract_tool_name(script) is_failure = "REJECTED" in result or "Error" in result or "error" in result.lower() if tool_name: if tool_name not in self._tool_stats: self._tool_stats[tool_name] = {"count": 0, "total_time_ms": 0.0, "failures": 0} self._tool_stats[tool_name]["count"] += 1 self._tool_stats[tool_name]["total_time_ms"] += elapsed_ms if is_failure: self._tool_stats[tool_name]["failures"] += 1 self.ui_last_script_text = script self.ui_last_script_output = result self._trigger_script_blink = True self.show_script_output = True if self.ui_auto_scroll_tool_calls: self._scroll_tool_calls_to_bottom = True def _extract_tool_name(self, script: str) -> str: if not script: return "unknown" script_lower = script.lower() if "powershell" in script_lower or "run_powershell" in script_lower: return "run_powershell" if "read_file" in script_lower: return "read_file" if "write_file" in script_lower or "write" in script_lower: return "write_file" if "list_directory" in script_lower or "ls" in script_lower: return "list_directory" if "search_files" in script_lower or "glob" in script_lower: return "search_files" if "web_search" in script_lower: return "web_search" if "fetch_url" in script_lower: return "fetch_url" if "py_get" in script_lower: return "py_get_skeleton" return "other" def resolve_pending_action(self, action_id: str, approved: bool) -> bool: with self._pending_dialog_lock: if action_id in self._pending_actions: dialog = self._pending_actions[action_id] with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True elif self._pending_dialog and self._pending_dialog._uid == action_id: dialog = self._pending_dialog with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True return False @property def current_provider(self) -> str: return self._current_provider @current_provider.setter def current_provider(self, value: str) -> None: if value != self._current_provider: self._current_provider = value ai_client.reset_session() ai_client.set_provider(value, self.current_model) self._token_stats = {} self._token_stats_dirty = True @property def current_model(self) -> str: return self._current_model @current_model.setter def current_model(self, value: str) -> None: if value != self._current_model: self._current_model = value ai_client.reset_session() ai_client.set_provider(self.current_provider, value) self._token_stats = {} self._token_stats_dirty = True def create_api(self) -> FastAPI: """Creates and configures the FastAPI application for headless mode.""" api = FastAPI(title="Manual Slop Headless API") API_KEY_NAME = "X-API-KEY" api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) async def get_api_key(header_key: str = Depends(api_key_header)) -> str: """Validates the API key from the request header against configuration.""" headless_cfg = self.config.get("headless", {}) config_key = headless_cfg.get("api_key", "").strip() env_key = os.environ.get("SLOP_API_KEY", "").strip() target_key = env_key or config_key if not target_key: raise HTTPException(status_code=403, detail="API Key not configured on server") if header_key == target_key: return header_key raise HTTPException(status_code=403, detail="Could not validate API Key") @api.get("/health") def health() -> dict[str, str]: """Returns the health status of the API.""" return {"status": "ok"} @api.get("/api/gui/state", dependencies=[Depends(get_api_key)]) def get_gui_state() -> dict[str, Any]: """Returns the current GUI state for specific fields.""" gettable = getattr(self, "_gettable_fields", {}) state = {} import dataclasses for key, attr in gettable.items(): val = getattr(self, attr, None) if dataclasses.is_dataclass(val): state[key] = dataclasses.asdict(val) else: state[key] = val return state @api.get("/api/gui/mma_status", dependencies=[Depends(get_api_key)]) def get_mma_status() -> dict[str, Any]: """Dedicated endpoint for MMA-related status.""" return { "mma_status": self.mma_status, "ai_status": self.ai_status, "mma_streams": self.mma_streams, "worker_status": self._worker_status, "tool_stats": self._tool_stats, "active_tier": self.active_tier, "active_tickets": self.active_tickets, "proposed_tracks": self.proposed_tracks } @api.post("/api/gui", dependencies=[Depends(get_api_key)]) def post_gui(req: dict) -> dict[str, str]: """Pushes a GUI task to the event queue.""" self.event_queue.put("gui_task", req) return {"status": "queued"} @api.get("/api/session", dependencies=[Depends(get_api_key)]) def get_api_session() -> dict[str, Any]: """Returns current discussion session entries.""" with self._disc_entries_lock: return {"session": {"entries": self.disc_entries}} @api.post("/api/session", dependencies=[Depends(get_api_key)]) def post_api_session(req: dict) -> dict[str, str]: """Updates session entries.""" entries = req.get("entries", []) with self._disc_entries_lock: self.disc_entries = entries return {"status": "updated"} @api.get("/api/project", dependencies=[Depends(get_api_key)]) def get_api_project() -> dict[str, Any]: """Returns current project data.""" return {"project": self.project} @api.get("/api/performance", dependencies=[Depends(get_api_key)]) def get_performance() -> dict[str, Any]: """Returns performance monitor metrics.""" return {"performance": self.perf_monitor.get_metrics()} @api.get("/api/gui/diagnostics", dependencies=[Depends(get_api_key)]) def get_diagnostics() -> dict[str, Any]: """Alias for performance metrics.""" return self.perf_monitor.get_metrics() @api.get("/status", dependencies=[Depends(get_api_key)]) def status() -> dict[str, Any]: """Returns the current status of the application.""" return { "provider": self.current_provider, "model": self.current_model, "status": self.ai_status, "usage": self.session_usage } @api.post("/api/v1/generate", dependencies=[Depends(get_api_key)]) def generate(req: GenerateRequest) -> dict[str, Any]: """Triggers an AI generation request using the current project context.""" if not req.prompt.strip(): raise HTTPException(status_code=400, detail="Prompt cannot be empty") with self._send_thread_lock: start_time = time.time() try: md, path, file_items, stable_md, disc_text = self._do_generate() self._last_stable_md = stable_md self.last_md = md self.last_md_path = path self.last_file_items = file_items except Exception as e: raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") user_msg = req.prompt base_dir = self.ui_files_base_dir csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) temp = req.temperature if req.temperature is not None else self.temperature tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens ai_client.set_model_params(temp, tokens, self.history_trunc_limit) ai_client.set_agent_tools(self.ui_agent_tools) if req.auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "User", "content": user_msg, "collapsed": True, "ts": project_manager.now_ts() }) try: resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text) if req.auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "AI", "content": resp, "collapsed": True, "ts": project_manager.now_ts() }) self._recalculate_session_usage() duration = time.time() - start_time return { "text": resp, "metadata": { "provider": self.current_provider, "model": self.current_model, "duration_sec": round(duration, 3), "timestamp": project_manager.now_ts() }, "usage": self.session_usage } except ai_client.ProviderError as e: raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}") except Exception as e: raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}") @api.post("/api/v1/stream", dependencies=[Depends(get_api_key)]) async def stream(req: GenerateRequest) -> Any: """Placeholder for streaming AI generation responses (Not yet implemented).""" raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.") @api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)]) def pending_actions() -> list[dict[str, Any]]: """Lists all pending PowerShell scripts awaiting confirmation.""" with self._pending_dialog_lock: return [ {"action_id": uid, "script": diag._script, "base_dir": diag._base_dir} for uid, diag in self._pending_actions.items() ] @api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)]) def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, str]: """Approves or rejects a pending action.""" with self._pending_dialog_lock: if action_id not in self._pending_actions: raise HTTPException(status_code=404, detail="Action not found") dialog = self._pending_actions.pop(action_id) if req.script is not None: dialog._script = req.script with dialog._condition: dialog._approved = req.approved dialog._done = True dialog._condition.notify_all() return {"status": "confirmed" if req.approved else "rejected"} @api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)]) def list_sessions() -> list[str]: """Lists all session IDs.""" log_dir = paths.get_logs_dir() if not log_dir.exists(): return [] return [d.name for d in log_dir.iterdir() if d.is_dir()] @api.get("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) def get_session(session_id: str) -> dict[str, Any]: """Returns the content of the comms.log for a specific session.""" log_path = paths.get_logs_dir() / session_id / "comms.log" if not log_path.exists(): raise HTTPException(status_code=404, detail="Session log not found") return {"id": session_id, "content": log_path.read_text(encoding="utf-8", errors="replace")} @api.delete("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) def delete_session(session_id: str) -> dict[str, str]: """Deletes a specific session directory.""" log_path = paths.get_logs_dir() / session_id if not log_path.exists() or not log_path.is_dir(): raise HTTPException(status_code=404, detail="Session directory not found") import shutil shutil.rmtree(log_path) return {"status": "deleted"} @api.get("/api/v1/context", dependencies=[Depends(get_api_key)]) def get_context() -> dict[str, Any]: """Returns the current aggregated project context.""" try: md, path, file_items, stable_md, disc_text = self._do_generate() # Pull current screenshots if available in project screenshots = self.project.get("screenshots", {}).get("paths", []) return { "files": [f.get("path") if isinstance(f, dict) else str(f) for f in file_items], "screenshots": screenshots, "files_base_dir": self.ui_files_base_dir, "markdown": md, "discussion": disc_text } except Exception as e: raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") @api.get("/api/v1/token_stats", dependencies=[Depends(get_api_key)]) def token_stats() -> dict[str, Any]: """Returns current token usage and budget statistics.""" return self._token_stats return api def _cb_new_project_automated(self, user_data: Any) -> None: if user_data: name = Path(user_data).stem proj = project_manager.default_project(name) project_manager.save_project(proj, user_data) if user_data not in self.project_paths: self.project_paths.append(user_data) self._switch_project(user_data) def _cb_project_save(self) -> None: self._flush_to_project() self._save_active_project() self._flush_to_config() models.save_config(self.config) self._set_status("config saved") def _cb_disc_create(self) -> None: nm = self.ui_disc_new_name_input.strip() if nm: self._create_discussion(nm) self.ui_disc_new_name_input = "" def _switch_project(self, path: str) -> None: if not Path(path).exists(): self._set_status(f"project file not found: {path}") return self._flush_to_project() self._save_active_project() try: self.project = project_manager.load_project(path) self.active_project_path = path except Exception as e: self._set_status(f"failed to load project: {e}") return self._refresh_from_project() self._set_status(f"switched to: {Path(path).stem}") def _refresh_from_project(self) -> None: self.files = list(self.project.get("files", {}).get("paths", [])) self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles = list(disc_sec.get("roles", ["User", "AI", "Vendor API", "System"])) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) with self._disc_entries_lock: self.disc_entries = models.parse_history_entries(disc_data.get("history", []), self.disc_roles) proj = self.project self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".") proj_meta = self.project.get("project", {}) self.ui_project_git_dir = proj_meta.get("git_dir", "") self.ui_project_system_prompt = proj_meta.get("system_prompt", "") self.ui_project_main_context = proj_meta.get("main_context", "") self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini") self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False) self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True) self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True) self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True) self.ui_summary_only = proj.get("project", {}).get("summary_only", False) agent_tools_cfg = proj.get("agent", {}).get("tools", {}) self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in models.AGENT_TOOL_NAMES} # MMA Tracks self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) # Restore MMA state mma_sec = proj.get("mma", {}) self.ui_epic_input = mma_sec.get("epic", "") at_data = mma_sec.get("active_track") if at_data: try: tickets = [] for t_data in at_data.get("tickets", []): tickets.append(models.Ticket(**t_data)) self.active_track = models.Track( id=at_data.get("id"), description=at_data.get("description"), tickets=tickets ) self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table except Exception as e: print(f"Failed to deserialize active track: {e}") self.active_track = None else: self.active_track = None self.active_tickets = [] # Load track-scoped history if track is active if self.active_track: track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir) if track_history: with self._disc_entries_lock: self.disc_entries = models.parse_history_entries(track_history, self.disc_roles) def _cb_load_track(self, track_id: str) -> None: state = project_manager.load_track_state(track_id, self.ui_files_base_dir) if state: try: # Convert list[Ticket] or list[dict] to list[Ticket] for Track object tickets = [] for t in state.tasks: if isinstance(t, dict): tickets.append(models.Ticket(**t)) else: tickets.append(t) self.active_track = models.Track( id=state.metadata.id, description=state.metadata.name, tickets=tickets ) # Keep dicts for UI table (or convert models.Ticket objects back to dicts if needed) self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets] # Load track-scoped history history = project_manager.load_track_history(track_id, self.ui_files_base_dir) with self._disc_entries_lock: if history: self.disc_entries = models.parse_history_entries(history, self.disc_roles) else: self.disc_entries = [] self._recalculate_session_usage() self._set_status(f"Loaded track: {state.metadata.name}") except Exception as e: self._set_status(f"Load track error: {e}") print(f"Error loading track {track_id}: {e}") def _save_active_project(self) -> None: if self.active_project_path: try: project_manager.save_project(self.project, self.active_project_path) except Exception as e: self._set_status(f"save error: {e}") def _get_discussion_names(self) -> list[str]: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) return sorted(discussions.keys()) def _switch_discussion(self, name: str) -> None: self._flush_disc_entries_to_project() disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if name not in discussions: self._set_status(f"discussion not found: {name}") return self.active_discussion = name self._track_discussion_active = False disc_sec["active"] = name disc_data = discussions[name] with self._disc_entries_lock: self.disc_entries = models.parse_history_entries(disc_data.get("history", []), self.disc_roles) self._set_status(f"discussion: {name}") def _flush_disc_entries_to_project(self) -> None: history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries] if self.active_track and self._track_discussion_active: project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir) return disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion()) disc_data["history"] = history_strings disc_data["last_updated"] = project_manager.now_ts() def _create_discussion(self, name: str) -> None: disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) if name in discussions: self._set_status(f"discussion '{name}' already exists") return discussions[name] = project_manager.default_discussion() self._switch_discussion(name) def _rename_discussion(self, old_name: str, new_name: str) -> None: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if old_name not in discussions: return if new_name in discussions: self._set_status(f"discussion '{new_name}' already exists") return discussions[new_name] = discussions.pop(old_name) if self.active_discussion == old_name: self.active_discussion = new_name disc_sec["active"] = new_name def _delete_discussion(self, name: str) -> None: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if len(discussions) <= 1: self._set_status("cannot delete the last discussion") return if name not in discussions: return del discussions[name] if self.active_discussion == name: remaining = sorted(discussions.keys()) self._switch_discussion(remaining[0]) def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None: if self._pending_mma_approval: dlg = self._pending_mma_approval.get("dialog_container", [None])[0] if dlg: with dlg._condition: dlg._approved = approved if payload is not None: dlg._payload = payload dlg._done = True dlg._condition.notify_all() self._pending_mma_approval = None if self._pending_mma_spawn: spawn_dlg = self._pending_mma_spawn.get("dialog_container", [None])[0] if spawn_dlg: with spawn_dlg._condition: spawn_dlg._approved = approved spawn_dlg._abort = abort if prompt is not None: spawn_dlg._prompt = prompt if context_md is not None: spawn_dlg._context_md = context_md spawn_dlg._done = True spawn_dlg._condition.notify_all() self._pending_mma_spawn = None def _handle_approve_ask(self) -> None: """Responds with approval for a pending /api/ask request.""" if not self._ask_request_id: return request_id = self._ask_request_id def do_post() -> None: try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": True}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reject_ask(self) -> None: """Responds with rejection for a pending /api/ask request.""" if not self._ask_request_id: return request_id = self._ask_request_id def do_post() -> None: try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": False}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reset_session(self) -> None: """Logic for resetting the AI session and GUI state.""" ai_client.reset_session() ai_client.clear_comms_log() self._tool_log.clear() self._tool_stats.clear() self._comms_log.clear() self.disc_entries.clear() # Clear history in ALL discussions to be safe disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) for d_name in discussions: discussions[d_name]["history"] = [] self._set_status("session reset") self.ai_response = "" self.ui_ai_input = "" self.ui_manual_approve = False self.ui_auto_add_history = False self._current_provider = "gemini" self._current_model = "gemini-2.5-flash-lite" ai_client.set_provider(self._current_provider, self._current_model) with self._pending_history_adds_lock: self._pending_history_adds.clear() with self._api_event_queue_lock: self._api_event_queue.clear() with self._pending_gui_tasks_lock: self._pending_gui_tasks.clear() def _handle_md_only(self) -> None: """Logic for the 'MD Only' action.""" def worker(): try: md, path, *_ = self._do_generate() self.last_md = md self.last_md_path = path self._set_status(f"md written: {path.name}") # Refresh token budget metrics with CURRENT md self._refresh_api_metrics({}, md_content=md) except Exception as e: self._set_status(f"error: {e}") threading.Thread(target=worker, daemon=True).start() def _handle_generate_send(self) -> None: """Logic for the 'Gen + Send' action.""" def worker(): sys.stderr.write("[DEBUG] _handle_generate_send worker started\n") sys.stderr.flush() try: md, path, file_items, stable_md, disc_text = self._do_generate() self._last_stable_md = stable_md self.last_md = md self.last_md_path = path self.last_file_items = file_items self._set_status("sending...") user_msg = self.ui_ai_input base_dir = self.ui_files_base_dir sys.stderr.write(f"[DEBUG] _do_generate success. Prompt: {user_msg[:50]}...\n") sys.stderr.flush() # Prepare event payload event_payload = events.UserRequestEvent( prompt=user_msg, stable_md=stable_md, file_items=file_items, disc_text=disc_text, base_dir=base_dir ) # Push to async queue self.event_queue.put("user_request", event_payload) sys.stderr.write("[DEBUG] Enqueued user_request event\n") sys.stderr.flush() except Exception as e: import traceback sys.stderr.write(f"[DEBUG] _do_generate ERROR: {e}\n{traceback.format_exc()}\n") sys.stderr.flush() self._set_status(f"generate error: {e}") threading.Thread(target=worker, daemon=True).start() def _recalculate_session_usage(self) -> None: usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0} for entry in ai_client.get_comms_log(): if entry.get("kind") == "response" and "usage" in entry.get("payload", {}): u = entry["payload"]["usage"] for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: if k in usage: usage[k] += u.get(k, 0) or 0 self.session_usage = usage # Update cached files list stats = ai_client.get_gemini_cache_stats() self._cached_files = stats.get("cached_files", []) def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None: if "latency" in payload: self.session_usage["last_latency"] = payload["latency"] self._recalculate_session_usage() if md_content is not None: stats = ai_client.get_token_stats(md_content) # Ensure compatibility if keys are named differently if "total_tokens" in stats and "estimated_prompt_tokens" not in stats: stats["estimated_prompt_tokens"] = stats["total_tokens"] self._token_stats = stats cache_stats = payload.get("cache_stats") if cache_stats: count = cache_stats.get("cache_count", 0) size_bytes = cache_stats.get("total_size_bytes", 0) self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)" self._update_cached_stats() def _update_cached_stats(self) -> None: import ai_client self._cached_cache_stats = ai_client.get_gemini_cache_stats() self._cached_tool_stats = dict(self._tool_stats) def clear_cache(self) -> None: import ai_client ai_client.cleanup() self._update_cached_stats() def get_session_insights(self) -> Dict[str, Any]: from src import cost_tracker total_input = sum(e["input"] for e in self._token_history) total_output = sum(e["output"] for e in self._token_history) total_tokens = total_input + total_output elapsed_min = (time.time() - self._session_start_time) / 60.0 if self._token_history else 0 burn_rate = total_tokens / elapsed_min if elapsed_min > 0 else 0 session_cost = cost_tracker.estimate_cost("gemini-2.5-flash", total_input, total_output) completed = sum(1 for t in self.active_tickets if t.get("status") == "complete") efficiency = total_tokens / completed if completed > 0 else 0 return { "total_tokens": total_tokens, "total_input": total_input, "total_output": total_output, "elapsed_min": elapsed_min, "burn_rate": burn_rate, "session_cost": session_cost, "completed_tickets": completed, "efficiency": efficiency, "call_count": len(self._token_history) } def _flush_to_project(self) -> None: proj = self.project proj.setdefault("output", {})["output_dir"] = self.ui_output_dir proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir proj["files"]["paths"] = self.files proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir proj["screenshots"]["paths"] = self.screenshots proj.setdefault("project", {}) proj["project"]["git_dir"] = self.ui_project_git_dir proj["project"]["system_prompt"] = self.ui_project_system_prompt proj["project"]["main_context"] = self.ui_project_main_context proj["project"]["word_wrap"] = self.ui_word_wrap proj["project"]["summary_only"] = self.ui_summary_only proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path proj.setdefault("agent", {}).setdefault("tools", {}) for t_name in models.AGENT_TOOL_NAMES: proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True) self._flush_disc_entries_to_project() disc_sec = proj.setdefault("discussion", {}) disc_sec["roles"] = self.disc_roles disc_sec["active"] = self.active_discussion disc_sec["auto_add"] = self.ui_auto_add_history # Save MMA State mma_sec = proj.setdefault("mma", {}) mma_sec["epic"] = self.ui_epic_input mma_sec["tier_models"] = {t: {"model": d["model"], "provider": d.get("provider", "gemini")} for t, d in self.mma_tier_usage.items()} if self.active_track: mma_sec["active_track"] = asdict(self.active_track) else: mma_sec["active_track"] = None def _flush_to_config(self) -> None: self.config["ai"] = { "provider": self.current_provider, "model": self.current_model, "temperature": self.temperature, "max_tokens": self.max_tokens, "history_trunc_limit": self.history_trunc_limit, } self.config["ai"]["system_prompt"] = self.ui_global_system_prompt self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path} self.config["gui"] = { "show_windows": self.show_windows, "separate_message_panel": getattr(self, "ui_separate_message_panel", False), "separate_response_panel": getattr(self, "ui_separate_response_panel", False), "separate_tool_calls_panel": getattr(self, "ui_separate_tool_calls_panel", False), } theme.save_to_config(self.config) def _do_generate(self) -> tuple[str, Path, list[dict[str, Any]], str, str]: """Returns (full_md, output_path, file_items, stable_md, discussion_text).""" self._flush_to_project() self._save_active_project() self._flush_to_config() models.save_config(self.config) track_id = self.active_track.id if self.active_track else None flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id) full_md, path, file_items = aggregate.run(flat) # Build stable markdown (no history) for Gemini caching screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", ".")) screenshots = flat.get("screenshots", {}).get("paths", []) summary_only = flat.get("project", {}).get("summary_only", False) stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only) # Build discussion history text separately history = flat.get("discussion", {}).get("history", []) discussion_text = aggregate.build_discussion_text(history) return full_md, path, file_items, stable_md, discussion_text def _cb_plan_epic(self) -> None: def _bg_task() -> None: sys.stderr.write("[DEBUG] _cb_plan_epic _bg_task started\n") sys.stderr.flush() try: self._set_status("Planning Epic (Tier 1)...") history = orchestrator_pm.get_track_history_summary() sys.stderr.write(f"[DEBUG] History summary length: {len(history)}\n") sys.stderr.flush() proj = project_manager.load_project(self.active_project_path) flat = project_manager.flat_config(self.project) file_items = aggregate.build_file_items(Path(self.ui_files_base_dir), flat.get("files", {}).get("paths", [])) _t1_baseline = len(ai_client.get_comms_log()) tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history) _t1_new = ai_client.get_comms_log()[_t1_baseline:] _t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"] _t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp) _t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp) def _push_t1_usage(i: int, o: int) -> None: self.mma_tier_usage["Tier 1"]["input"] += i self.mma_tier_usage["Tier 1"]["output"] += o with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "custom_callback", "callback": _push_t1_usage, "args": [_t1_in, _t1_out] }) self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": { "text": json.dumps(tracks, indent=2), "stream_id": "Tier 1", "status": "Epic tracks generated." } }) self._pending_gui_tasks.append({ "action": "show_track_proposal", "payload": tracks }) except Exception as e: self._set_status(f"Epic plan error: {e}") print(f"ERROR in _cb_plan_epic background task: {e}") threading.Thread(target=_bg_task, daemon=True).start() def _cb_accept_tracks(self) -> None: self._show_track_proposal_modal = False def _bg_task() -> None: sys.stderr.write("[DEBUG] _cb_accept_tracks _bg_task started\n") # Generate skeletons once self._set_status("Phase 2: Generating skeletons for all tracks...") sys.stderr.write("[DEBUG] Creating ASTParser...\n") parser = ASTParser(language="python") generated_skeletons = "" try: # Use a local copy of files to avoid concurrent modification issues files_to_scan = list(self.files) sys.stderr.write(f"[DEBUG] Scanning {len(files_to_scan)} files for skeletons...\n") for i, file_path in enumerate(files_to_scan): try: self._set_status(f"Phase 2: Scanning files ({i+1}/{len(files_to_scan)})...") abs_path = Path(self.ui_files_base_dir) / file_path if abs_path.exists() and abs_path.suffix == ".py": with open(abs_path, "r", encoding="utf-8") as f: code = f.read() generated_skeletons += f"\nFile: {file_path}\n{parser.get_skeleton(code)}\n" except Exception as e: sys.stderr.write(f"[DEBUG] Error parsing skeleton for {file_path}: {e}\n") except Exception as e: sys.stderr.write(f"[DEBUG] Error in scan loop: {e}\n") self._set_status(f"Error generating skeletons: {e}") return # Exit if skeleton generation fails sys.stderr.write("[DEBUG] Skeleton generation complete. Starting tracks...\n") # Now loop through tracks and call _start_track_logic with generated skeletons total_tracks = len(self.proposed_tracks) for i, track_data in enumerate(self.proposed_tracks): title = track_data.get("title") or track_data.get("goal", "Untitled Track") self._set_status(f"Processing track {i+1} of {total_tracks}: '{title}'...") self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons sys.stderr.write("[DEBUG] All tracks started. Refreshing...\n") with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started self._set_status(f"All {total_tracks} tracks accepted and execution started.") threading.Thread(target=_bg_task, daemon=True).start() def _cb_start_track(self, user_data: Any = None) -> None: if isinstance(user_data, str): # If track_id is provided directly track_id = user_data # Ensure it's loaded as active if not self.active_track or self.active_track.id != track_id: self._cb_load_track(track_id) if self.active_track: # Use the active track object directly to start execution self._set_mma_status("running") engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode) flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id) full_md, _, _ = aggregate.run(flat) threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start() self._set_status(f"Track '{self.active_track.description}' started.") return idx = 0 if isinstance(user_data, int): idx = user_data elif isinstance(user_data, dict): idx = user_data.get("index", 0) if 0 <= idx < len(self.proposed_tracks): track_data = self.proposed_tracks[idx] title = track_data.get("title") or track_data.get("goal", "Untitled Track") threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start() self._set_status(f"Track '{title}' started.") def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None: try: goal = track_data.get("goal", "") title = track_data.get("title") or track_data.get("goal", "Untitled Track") self._set_status(f"Phase 2: Generating tickets for {title}...") skeletons = skeletons_str or "" # Use provided skeletons or empty self._set_status("Phase 2: Calling Tech Lead...") _t2_baseline = len(ai_client.get_comms_log()) raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons) _t2_new = ai_client.get_comms_log()[_t2_baseline:] _t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"] _t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp) _t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp) def _push_t2_usage(i: int, o: int) -> None: self.mma_tier_usage["Tier 2"]["input"] += i self.mma_tier_usage["Tier 2"]["output"] += o with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "custom_callback", "callback": _push_t2_usage, "args": [_t2_in, _t2_out] }) if not raw_tickets: self._set_status(f"Error: No tickets generated for track: {title}") print(f"Warning: No tickets generated for track: {title}") return self._set_status("Phase 2: Sorting tickets...") try: sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets) except ValueError as e: print(f"Dependency error in track '{title}': {e}") sorted_tickets_data = raw_tickets # 3. Create Track and Ticket objects tickets = [] for t_data in sorted_tickets_data: ticket = models.Ticket( id=t_data["id"], description=t_data.get("description") or t_data.get("goal", "No description"), status=t_data.get("status", "todo"), assigned_to=t_data.get("assigned_to", "unassigned"), depends_on=t_data.get("depends_on", []), step_mode=t_data.get("step_mode", False) ) tickets.append(ticket) track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}" track = models.Track(id=track_id, description=title, tickets=tickets) # Initialize track state in the filesystem meta = models.Metadata(id=track_id, name=title, status="todo", created_at=datetime.now(), updated_at=datetime.now()) state = models.TrackState(metadata=meta, discussion=[], tasks=tickets) project_manager.save_track_state(track_id, state, self.ui_files_base_dir) # Add to memory and notify UI self.tracks.append({"id": track_id, "title": title, "status": "todo"}) with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # 4. Initialize ConductorEngine and run loop engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode) # Use current full markdown context for the track execution track_id_param = track.id flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param) full_md, _, _ = aggregate.run(flat) # Start the engine in a separate thread threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start() except Exception as e: self._set_status(f"Track start error: {e}") print(f"ERROR in _start_track_logic: {e}") def _cb_ticket_retry(self, ticket_id: str) -> None: for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'todo' break self.event_queue.put("mma_retry", {"ticket_id": ticket_id}) def _cb_ticket_skip(self, ticket_id: str) -> None: for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'skipped' break self.event_queue.put("mma_skip", {"ticket_id": ticket_id}) def _cb_run_conductor_setup(self) -> None: base = paths.get_conductor_dir() if not base.exists(): self.ui_conductor_setup_summary = f"Error: {base}/ directory not found." return files = list(base.glob("**/*")) files = [f for f in files if f.is_file()] summary = [f"Conductor Directory: {base.absolute()}"] summary.append(f"Total Files: {len(files)}") total_lines = 0 for f in files: try: with open(f, "r", encoding="utf-8") as fd: lines = len(fd.readlines()) total_lines += lines summary.append(f"- {f.relative_to(base)}: {lines} lines") except Exception: summary.append(f"- {f.relative_to(base)}: Error reading") summary.append(f"Total Line Count: {total_lines}") tracks_dir = base / "tracks" if tracks_dir.exists(): tracks = [d for d in tracks_dir.iterdir() if d.is_dir()] summary.append(f"Total Tracks Found: {len(tracks)}") else: summary.append("Tracks Directory: Not found") self.ui_conductor_setup_summary = "\n".join(summary) def _cb_create_track(self, name: str, desc: str, track_type: str) -> None: if not name: return date_suffix = datetime.now().strftime("%Y%m%d") track_id = f"{name.lower().replace(' ', '_')}_{date_suffix}" track_dir = paths.get_tracks_dir() / track_id track_dir.mkdir(parents=True, exist_ok=True) spec_file = track_dir / "spec.md" with open(spec_file, "w", encoding="utf-8") as f: f.write(f"# Specification: {name}\n\nType: {track_type}\n\nDescription: {desc}\n") plan_file = track_dir / "plan.md" with open(plan_file, "w", encoding="utf-8") as f: f.write(f"# Implementation Plan: {name}\n\n- [ ] Task 1: Initialize\n") meta_file = track_dir / "metadata.json" with open(meta_file, "w", encoding="utf-8") as f: json.dump({ "id": track_id, "title": name, "description": desc, "type": track_type, "status": "new", "progress": 0.0 }, f, indent=1) # Refresh tracks from disk self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) def _push_mma_state_update(self) -> None: if not self.active_track: return # Sync active_tickets (list of dicts) back to active_track.tickets (list of models.Ticket objects) self.active_track.tickets = [models.Ticket.from_dict(t) for t in self.active_tickets] # Save the state to disk existing = project_manager.load_track_state(self.active_track.id, self.ui_files_base_dir) meta = models.Metadata( id=self.active_track.id, name=self.active_track.description, status=self.mma_status, created_at=existing.metadata.created_at if existing else datetime.now(), updated_at=datetime.now() ) state = models.TrackState( metadata=meta, discussion=existing.discussion if existing else [], tasks=self.active_track.tickets ) project_manager.save_track_state(self.active_track.id, state, self.ui_files_base_dir)