import queue import threading import time import sys import os from typing import Any, List, Dict, Optional, Tuple, Callable, Union, cast from pathlib import Path import json import uuid import tomli_w import requests from dataclasses import asdict from datetime import datetime from tkinter import filedialog, Tk from fastapi import FastAPI, Depends, HTTPException from fastapi.security.api_key import APIKeyHeader from pydantic import BaseModel from src import events from src import session_logger from src import project_manager from src import performance_monitor from src import models from src.log_registry import LogRegistry from src.log_pruner import LogPruner from src.file_cache import ASTParser from src import ai_client from src import shell_runner from src import mcp_client from src import aggregate from src import orchestrator_pm from src import conductor_tech_lead from src import cost_tracker from src import multi_agent_conductor from src import theme from src.ai_client import ProviderError def save_config(config: dict[str, Any]) -> None: with open(models.CONFIG_PATH, "wb") as f: tomli_w.dump(config, f) def hide_tk_root() -> Tk: root = Tk() root.withdraw() root.wm_attributes("-topmost", True) return root class GenerateRequest(BaseModel): prompt: str auto_add_history: bool = True temperature: float | None = None max_tokens: int | None = None class ConfirmRequest(BaseModel): approved: bool script: Optional[str] = None class ConfirmDialog: def __init__(self, script: str, base_dir: str) -> None: self._uid = str(uuid.uuid4()) self._script = str(script) if script is not None else "" self._base_dir = str(base_dir) if base_dir is not None else "" self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: start_time = time.time() with self._condition: while not self._done: if time.time() - start_time > 120: return False, self._script self._condition.wait(timeout=0.1) return self._approved, self._script class MMAApprovalDialog: def __init__(self, ticket_id: str, payload: str) -> None: self._payload = payload self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: start_time = time.time() with self._condition: while not self._done: if time.time() - start_time > 120: return False, self._payload self._condition.wait(timeout=0.1) return self._approved, self._payload class MMASpawnApprovalDialog: def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None: self._prompt = prompt self._context_md = context_md self._condition = threading.Condition() self._done = False self._approved = False self._abort = False def wait(self) -> dict[str, Any]: start_time = time.time() with self._condition: while not self._done: if time.time() - start_time > 120: return {'approved': False, 'abort': True, 'prompt': self._prompt, 'context_md': self._context_md} self._condition.wait(timeout=0.1) return { 'approved': self._approved, 'abort': self._abort, 'prompt': self._prompt, 'context_md': self._context_md } class AppController: """ The headless controller for the Manual Slop application. Owns the application state and manages background services. """ PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek"] def __init__(self): # Initialize locks first to avoid initialization order issues self._send_thread_lock: threading.Lock = threading.Lock() self._disc_entries_lock: threading.Lock = threading.Lock() self._pending_comms_lock: threading.Lock = threading.Lock() self._pending_tool_calls_lock: threading.Lock = threading.Lock() self._pending_history_adds_lock: threading.Lock = threading.Lock() self._pending_gui_tasks_lock: threading.Lock = threading.Lock() self._pending_dialog_lock: threading.Lock = threading.Lock() self._api_event_queue_lock: threading.Lock = threading.Lock() self.config: Dict[str, Any] = {} self.project: Dict[str, Any] = {} self.active_project_path: str = "" self.project_paths: List[str] = [] self.active_discussion: str = "main" self.disc_entries: List[Dict[str, Any]] = [] self.disc_roles: List[str] = [] self.files: List[str] = [] self.screenshots: List[str] = [] self.event_queue: events.SyncEventQueue = events.SyncEventQueue() self._loop_thread: Optional[threading.Thread] = None self.tracks: List[Dict[str, Any]] = [] self.active_track: Optional[models.Track] = None self.active_tickets: List[Dict[str, Any]] = [] self.mma_streams: Dict[str, str] = {} self.mma_status: str = "idle" self._tool_log: List[Dict[str, Any]] = [] self._comms_log: List[Dict[str, Any]] = [] self.session_usage: Dict[str, Any] = { "input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "last_latency": 0.0 } self.mma_tier_usage: Dict[str, Dict[str, Any]] = { "Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"}, "Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview"}, "Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"}, "Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"}, } self.perf_monitor: performance_monitor.PerformanceMonitor = performance_monitor.PerformanceMonitor() self._pending_gui_tasks: List[Dict[str, Any]] = [] self._api_event_queue: List[Dict[str, Any]] = [] # Pending dialogs state moved from App self._pending_dialog: Optional[ConfirmDialog] = None self._pending_dialog_open: bool = False self._pending_actions: Dict[str, ConfirmDialog] = {} self._pending_ask_dialog: bool = False # AI settings state self._current_provider: str = "gemini" self._current_model: str = "gemini-2.5-flash-lite" self.temperature: float = 0.0 self.max_tokens: int = 8192 self.history_trunc_limit: int = 8000 # UI-related state moved to controller self.ui_ai_input: str = "" self.ui_disc_new_name_input: str = "" self.ui_disc_new_role_input: str = "" self.ui_epic_input: str = "" self.ui_new_track_name: str = "" self.ui_new_track_desc: str = "" self.ui_new_track_type: str = "feature" self.ui_conductor_setup_summary: str = "" self.ui_last_script_text: str = "" self.ui_last_script_output: str = "" self.ui_new_ticket_id: str = "" self.ui_new_ticket_desc: str = "" self.ui_new_ticket_target: str = "" self.ui_new_ticket_deps: str = "" self.ui_output_dir: str = "" self.ui_files_base_dir: str = "" self.ui_shots_base_dir: str = "" self.ui_project_git_dir: str = "" self.ui_project_main_context: str = "" self.ui_project_system_prompt: str = "" self.ui_gemini_cli_path: str = "gemini" self.ui_word_wrap: bool = True self.ui_summary_only: bool = False self.ui_auto_add_history: bool = False self.ui_global_system_prompt: str = "" self.ui_agent_tools: Dict[str, bool] = {} self.available_models: List[str] = [] self.proposed_tracks: List[Dict[str, Any]] = [] self._show_track_proposal_modal: bool = False self.ai_status: str = 'idle' self.ai_response: str = '' self.last_md: str = '' self.last_md_path: Optional[Path] = None self.last_file_items: List[Any] = [] self.send_thread: Optional[threading.Thread] = None self.models_thread: Optional[threading.Thread] = None self.show_windows: Dict[str, bool] = {} self.show_script_output: bool = False self.show_text_viewer: bool = False self.text_viewer_title: str = '' self.text_viewer_content: str = '' self._pending_comms: List[Dict[str, Any]] = [] self._pending_tool_calls: List[Dict[str, Any]] = [] self._pending_history_adds: List[Dict[str, Any]] = [] self.perf_history: Dict[str, List[float]] = {'frame_time': [0.0]*100, 'fps': [0.0]*100, 'cpu': [0.0]*100, 'input_lag': [0.0]*100} self._perf_last_update: float = 0.0 self._autosave_interval: float = 60.0 self._last_autosave: float = time.time() # More state moved from App self._ask_dialog_open: bool = False self._ask_request_id: Optional[str] = None self._ask_tool_data: Optional[Dict[str, Any]] = None self.mma_step_mode: bool = False self.active_tier: Optional[str] = None self.ui_focus_agent: Optional[str] = None self._pending_mma_approval: Optional[Dict[str, Any]] = None self._mma_approval_open: bool = False self._mma_approval_edit_mode: bool = False self._mma_approval_payload: str = "" self._pending_mma_spawn: Optional[Dict[str, Any]] = None self._mma_spawn_open: bool = False self._mma_spawn_edit_mode: bool = False self._mma_spawn_prompt: str = '' self._mma_spawn_context: str = '' self._trigger_blink: bool = False self._is_blinking: bool = False self._blink_start_time: float = 0.0 self._trigger_script_blink: bool = False self._is_script_blinking: bool = False self._script_blink_start_time: float = 0.0 self._scroll_disc_to_bottom: bool = False self._scroll_comms_to_bottom: bool = False self._scroll_tool_calls_to_bottom: bool = False self._gemini_cache_text: str = "" self._last_stable_md: str = '' self._token_stats: Dict[str, Any] = {} self._token_stats_dirty: bool = False self.ui_disc_truncate_pairs: int = 2 self.ui_auto_scroll_comms: bool = True self.ui_auto_scroll_tool_calls: bool = True self._show_add_ticket_form: bool = False self._track_discussion_active: bool = False self._tier_stream_last_len: Dict[str, int] = {} self.is_viewing_prior_session: bool = False self.prior_session_entries: List[Dict[str, Any]] = [] self.test_hooks_enabled: bool = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1") self.ui_manual_approve: bool = False self._settable_fields: Dict[str, str] = { 'ai_input': 'ui_ai_input', 'project_git_dir': 'ui_project_git_dir', 'auto_add_history': 'ui_auto_add_history', 'disc_new_name_input': 'ui_disc_new_name_input', 'project_main_context': 'ui_project_main_context', 'gcli_path': 'ui_gemini_cli_path', 'output_dir': 'ui_output_dir', 'files_base_dir': 'ui_files_base_dir', 'ai_status': 'ai_status', 'ai_response': 'ai_response', 'active_discussion': 'active_discussion', 'current_provider': 'current_provider', 'current_model': 'current_model', 'token_budget_pct': '_token_budget_pct', 'token_budget_current': '_token_budget_current', 'token_budget_label': '_token_budget_label', 'show_confirm_modal': 'show_confirm_modal', 'mma_epic_input': 'ui_epic_input', 'mma_status': 'mma_status', 'mma_active_tier': 'active_tier', 'ui_new_track_name': 'ui_new_track_name', 'ui_new_track_desc': 'ui_new_track_desc', 'manual_approve': 'ui_manual_approve' } self._gettable_fields = dict(self._settable_fields) self._gettable_fields.update({ 'ui_focus_agent': 'ui_focus_agent', 'active_discussion': 'active_discussion', '_track_discussion_active': '_track_discussion_active', 'proposed_tracks': 'proposed_tracks', 'mma_streams': 'mma_streams', 'active_track': 'active_track', 'active_tickets': 'active_tickets', 'tracks': 'tracks', 'thinking_indicator': 'thinking_indicator', 'operations_live_indicator': 'operations_live_indicator', 'prior_session_indicator': 'prior_session_indicator' }) self._init_actions() @property def thinking_indicator(self) -> bool: return self.ai_status in ("sending...", "streaming...") @property def operations_live_indicator(self) -> bool: return not self.is_viewing_prior_session @property def prior_session_indicator(self) -> bool: return self.is_viewing_prior_session def _init_actions(self) -> None: # Set up state-related action maps self._clickable_actions: dict[str, Callable[..., Any]] = { 'btn_reset': self._handle_reset_session, 'btn_gen_send': self._handle_generate_send, 'btn_md_only': self._handle_md_only, 'btn_approve_script': self._handle_approve_script, 'btn_reject_script': self._handle_reject_script, 'btn_project_save': self._cb_project_save, 'btn_disc_create': self._cb_disc_create, 'btn_mma_plan_epic': self._cb_plan_epic, 'btn_mma_accept_tracks': self._cb_accept_tracks, 'btn_mma_start_track': self._cb_start_track, 'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type), 'btn_approve_tool': self._handle_approve_ask, 'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True), 'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True), } self._predefined_callbacks: dict[str, Callable[..., Any]] = { '_test_callback_func_write_to_file': self._test_callback_func_write_to_file, '_set_env_var': lambda k, v: os.environ.update({k: v}) } def _update_gcli_adapter(self, path: str) -> None: sys.stderr.write(f"[DEBUG] _update_gcli_adapter called with: {path}\n") sys.stderr.flush() if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=str(path)) else: ai_client._gemini_cli_adapter.binary_path = str(path) def _set_status(self, status: str) -> None: """Thread-safe update of ai_status via the GUI task queue.""" with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "set_ai_status", "payload": status }) def _set_mma_status(self, status: str) -> None: """Thread-safe update of mma_status via the GUI task queue.""" with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "set_mma_status", "payload": status }) def _process_pending_gui_tasks(self) -> None: if not self._pending_gui_tasks: return with self._pending_gui_tasks_lock: tasks = self._pending_gui_tasks[:] self._pending_gui_tasks.clear() for task in tasks: try: action = task.get("action") if action: session_logger.log_api_hook("PROCESS_TASK", action, str(task)) # ... if action == "refresh_api_metrics": self._refresh_api_metrics(task.get("payload", {}), md_content=self.last_md or None) elif action == "set_ai_status": self.ai_status = task.get("payload", "") sys.stderr.write(f"[DEBUG] Updated ai_status via task to: {self.ai_status}\n") sys.stderr.flush() elif action == "set_mma_status": self.mma_status = task.get("payload", "") elif action == "handle_ai_response": payload = task.get("payload", {}) text = payload.get("text", "") stream_id = payload.get("stream_id") is_streaming = payload.get("status") == "streaming..." if stream_id: if is_streaming: if stream_id not in self.mma_streams: self.mma_streams[stream_id] = "" self.mma_streams[stream_id] += text else: self.mma_streams[stream_id] = text if stream_id == "Tier 1": if "status" in payload: self.ai_status = payload["status"] else: if is_streaming: self.ai_response += text else: self.ai_response = text self.ai_status = payload.get("status", "done") sys.stderr.write(f"[DEBUG] Updated ai_status to: {self.ai_status}\n") sys.stderr.flush() self._trigger_blink = True if not stream_id: self._token_stats_dirty = True # ONLY add to history when turn is complete if self.ui_auto_add_history and not stream_id and not is_streaming: role = payload.get("role", "AI") with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": role, "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts() }) elif action in ("mma_stream", "mma_stream_append"): # Some events might have these at top level, some in a 'payload' dict stream_id = task.get("stream_id") or task.get("payload", {}).get("stream_id") text = task.get("text") or task.get("payload", {}).get("text", "") if stream_id: if stream_id not in self.mma_streams: self.mma_streams[stream_id] = "" self.mma_streams[stream_id] += text elif action == "show_track_proposal": self.proposed_tracks = task.get("payload", []) self._show_track_proposal_modal = True elif action == "mma_state_update": # Handle both internal (nested) and hook-server (flattened) payloads payload = task.get("payload") if not isinstance(payload, dict): payload = task # Fallback to task if payload missing or wrong type self.mma_status = payload.get("status", "idle") self.active_tier = payload.get("active_tier") self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage) self.active_tickets = payload.get("tickets", []) track_data = payload.get("track") if track_data: tickets = [] for t_data in self.active_tickets: tickets.append(models.Ticket(**t_data)) self.active_track = models.Track( id=track_data.get("id"), description=track_data.get("title", ""), tickets=tickets ) elif action == "set_value": item = task.get("item") value = task.get("value") sys.stderr.write(f"[DEBUG] Processing set_value: {item}={value}\n") sys.stderr.flush() if item in self._settable_fields: attr_name = self._settable_fields[item] setattr(self, attr_name, value) sys.stderr.write(f"[DEBUG] Set {attr_name} to {value}\n") sys.stderr.flush() if item == "gcli_path": self._update_gcli_adapter(str(value)) elif action == "click": item = task.get("item") user_data = task.get("user_data") sys.stderr.write(f"[DEBUG] Processing click: {item} (user_data={user_data})\n") sys.stderr.flush() if item == "btn_project_new_automated": self._cb_new_project_automated(user_data) elif item == "btn_mma_load_track": self._cb_load_track(str(user_data or "")) elif item in self._clickable_actions: import inspect func = self._clickable_actions[item] try: sig = inspect.signature(func) if 'user_data' in sig.parameters: func(user_data=user_data) else: func() except Exception: func() elif action == "select_list_item": item = task.get("listbox", task.get("item")) value = task.get("item_value", task.get("value")) if item == "disc_listbox": self._switch_discussion(str(value or "")) elif task.get("type") == "ask": self._pending_ask_dialog = True self._ask_request_id = task.get("request_id") self._ask_tool_data = task.get("data", {}) elif action == "clear_ask": if self._ask_request_id == task.get("request_id"): self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None elif action == "custom_callback": cb = task.get("callback") args = task.get("args", []) if callable(cb): try: cb(*args) except Exception as e: print(f"Error in direct custom callback: {e}") elif cb in self._predefined_callbacks: self._predefined_callbacks[cb](*args) elif action == "mma_step_approval": dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or "")) self._pending_mma_approval = task if "dialog_container" in task: task["dialog_container"][0] = dlg elif action == 'refresh_from_project': self._refresh_from_project() elif action == "mma_spawn_approval": spawn_dlg = MMASpawnApprovalDialog( str(task.get("ticket_id") or ""), str(task.get("role") or ""), str(task.get("prompt") or ""), str(task.get("context_md") or "") ) self._pending_mma_spawn = task self._mma_spawn_prompt = task.get("prompt", "") self._mma_spawn_context = task.get("context_md", "") self._mma_spawn_open = True self._mma_spawn_edit_mode = False if "dialog_container" in task: task["dialog_container"][0] = spawn_dlg except Exception as e: import traceback sys.stderr.write(f"[DEBUG] Error executing GUI task: {e}\n{traceback.format_exc()}\n") sys.stderr.flush() print(f"Error executing GUI task: {e}") def _process_pending_history_adds(self) -> None: """Synchronizes pending history entries to the active discussion and project state.""" with self._pending_history_adds_lock: items = self._pending_history_adds[:] self._pending_history_adds.clear() if not items: return self._scroll_disc_to_bottom = True for item in items: item.get("role", "unknown") if item.get("role") and item["role"] not in self.disc_roles: self.disc_roles.append(item["role"]) disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) disc_data = discussions.get(self.active_discussion) if disc_data is not None: if item.get("disc_title", self.active_discussion) == self.active_discussion: if self.disc_entries is not disc_data.get("history"): if "history" not in disc_data: disc_data["history"] = [] disc_data["history"].append(project_manager.entry_to_str(item)) disc_data["last_updated"] = project_manager.now_ts() with self._disc_entries_lock: self.disc_entries.append(item) def _test_callback_func_write_to_file(self, data: str) -> None: """A dummy function that a custom_callback would execute for testing.""" callback_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "tests", "artifacts", "temp_callback_output.txt") os.makedirs(os.path.dirname(callback_path), exist_ok=True) with open(callback_path, "w") as f: f.write(data) def _handle_approve_script(self, user_data=None) -> None: """Approves the currently pending PowerShell script.""" with self._pending_dialog_lock: dlg = self._pending_dialog if dlg: with dlg._condition: dlg._approved = True dlg._done = True dlg._condition.notify_all() self._pending_dialog = None def _handle_reject_script(self, user_data=None) -> None: """Rejects the currently pending PowerShell script.""" with self._pending_dialog_lock: dlg = self._pending_dialog if dlg: with dlg._condition: dlg._approved = False dlg._done = True dlg._condition.notify_all() self._pending_dialog = None def init_state(self): """Initializes the application state from configurations.""" self.config = models.load_config() ai_cfg = self.config.get("ai", {}) self._current_provider = ai_cfg.get("provider", "gemini") self._current_model = ai_cfg.get("model", "gemini-2.5-flash-lite") self.temperature = ai_cfg.get("temperature", 0.0) self.max_tokens = ai_cfg.get("max_tokens", 8192) self.history_trunc_limit = ai_cfg.get("history_trunc_limit", 8000) projects_cfg = self.config.get("projects", {}) self.project_paths = list(projects_cfg.get("paths", [])) self.active_project_path = projects_cfg.get("active", "") self._load_active_project() self.files = list(self.project.get("files", {}).get("paths", [])) self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles = list(disc_sec.get("roles", list(models.DISC_ROLES))) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) with self._disc_entries_lock: self.disc_entries = models.parse_history_entries(disc_data.get("history", []), self.disc_roles) # UI state self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".") proj_meta = self.project.get("project", {}) self.ui_project_git_dir = proj_meta.get("git_dir", "") self.ui_project_main_context = proj_meta.get("main_context", "") self.ui_project_system_prompt = proj_meta.get("system_prompt", "") self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini") self._update_gcli_adapter(self.ui_gemini_cli_path) self.ui_word_wrap = proj_meta.get("word_wrap", True) self.ui_summary_only = proj_meta.get("summary_only", False) self.ui_auto_add_history = disc_sec.get("auto_add", False) self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "") _default_windows = { "Context Hub": True, "Files & Media": True, "AI Settings": True, "MMA Dashboard": True, "Tier 1: Strategy": True, "Tier 2: Tech Lead": True, "Tier 3: Workers": True, "Tier 4: QA": True, "Discussion Hub": True, "Operations Hub": True, "Theme": True, "Log Management": False, "Diagnostics": False, } saved = self.config.get("gui", {}).get("show_windows", {}) self.show_windows = {k: saved.get(k, v) for k, v in _default_windows.items()} agent_tools_cfg = self.project.get("agent", {}).get("tools", {}) self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in models.AGENT_TOOL_NAMES} label = self.project.get("project", {}).get("name", "") session_logger.open_session(label=label) def cb_load_prior_log(self) -> None: root = hide_tk_root() path = filedialog.askopenfilename( title="Load Session Log", initialdir="logs", filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")] ) root.destroy() if not path: return entries = [] try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: try: entries.append(json.loads(line)) except json.JSONDecodeError: continue except Exception as e: self._set_status(f"log load error: {e}") return self.prior_session_entries = entries self.is_viewing_prior_session = True self._set_status(f"viewing prior session: {Path(path).name} ({len(entries)} entries)") def _load_active_project(self) -> None: """Loads the active project configuration, with fallbacks.""" if self.active_project_path and Path(self.active_project_path).exists(): try: self.project = project_manager.load_project(self.active_project_path) return except Exception as e: print(f"Failed to load project {self.active_project_path}: {e}") for pp in self.project_paths: if Path(pp).exists(): try: self.project = project_manager.load_project(pp) self.active_project_path = pp return except Exception: continue self.project = project_manager.migrate_from_legacy_config(self.config) name = self.project.get("project", {}).get("name", "project") fallback_path = f"{name}.toml" project_manager.save_project(self.project, fallback_path) self.active_project_path = fallback_path if fallback_path not in self.project_paths: self.project_paths.append(fallback_path) def _prune_old_logs(self) -> None: """Asynchronously prunes old insignificant logs on startup.""" def run_prune() -> None: try: from src import log_registry from src import log_pruner registry = log_registry.LogRegistry("logs/log_registry.toml") pruner = log_pruner.LogPruner(registry, "logs") pruner.prune() except Exception as e: print(f"Error during log pruning: {e}") thread = threading.Thread(target=run_prune, daemon=True) thread.start() def _fetch_models(self, provider: str) -> None: self._set_status("fetching models...") def do_fetch() -> None: try: models_list = ai_client.list_models(provider) self.available_models = models_list if self.current_model not in models_list and models_list: self.current_model = models_list[0] ai_client.set_provider(self._current_provider, self.current_model) self._set_status(f"models loaded: {len(models_list)}") except Exception as e: self._set_status(f"model fetch error: {e}") self.models_thread = threading.Thread(target=do_fetch, daemon=True) self.models_thread.start() def start_services(self, app: Any = None): """Starts background threads.""" sys.stderr.write("[DEBUG] AppController.start_services called\n") sys.stderr.flush() self._prune_old_logs() self._init_ai_and_hooks(app) self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) self._loop_thread.start() sys.stderr.write(f"[DEBUG] _loop_thread started: {self._loop_thread.ident}\n") sys.stderr.flush() def shutdown(self) -> None: """Stops background threads and cleans up resources.""" from src import ai_client ai_client.cleanup() if hasattr(self, 'hook_server') and self.hook_server: self.hook_server.stop() self.event_queue.put("shutdown", None) if self._loop_thread and self._loop_thread.is_alive(): self._loop_thread.join(timeout=2.0) def _init_ai_and_hooks(self, app: Any = None) -> None: from src import api_hooks ai_client.set_provider(self._current_provider, self._current_model) if self._current_provider == "gemini_cli": if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) else: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path ai_client.confirm_and_run_callback = self._confirm_and_run ai_client.comms_log_callback = self._on_comms_entry ai_client.tool_log_callback = self._on_tool_log mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics self.perf_monitor.alert_callback = self._on_performance_alert ai_client.events.on("request_start", lambda **kw: self._on_api_event("request_start", **kw)) ai_client.events.on("response_received", lambda **kw: self._on_api_event("response_received", **kw)) ai_client.events.on("tool_execution", lambda **kw: self._on_api_event("tool_execution", **kw)) self.hook_server = api_hooks.HookServer(app if app else self) self.hook_server.start() def _run_event_loop(self): """Internal loop runner.""" def queue_fallback() -> None: while True: try: if hasattr(self, '_process_pending_gui_tasks'): self._process_pending_gui_tasks() if hasattr(self, '_process_pending_history_adds'): self._process_pending_history_adds() except: pass time.sleep(0.1) fallback_thread = threading.Thread(target=queue_fallback, daemon=True) fallback_thread.start() self._process_event_queue() def _process_event_queue(self) -> None: """Listens for and processes events from the SyncEventQueue.""" sys.stderr.write("[DEBUG] _process_event_queue entered\n") sys.stderr.flush() def tick_perf(): while True: self.perf_monitor.start_frame() time.sleep(0.01) # Measurable frame time self.perf_monitor.end_frame() time.sleep(0.006) # Aim for ~60 FPS total threading.Thread(target=tick_perf, daemon=True).start() while True: event_name, payload = self.event_queue.get() sys.stderr.write(f"[DEBUG] _process_event_queue got event: {event_name} with payload: {str(payload)[:100]}\n") sys.stderr.flush() if event_name == "shutdown": break if event_name == "user_request": threading.Thread(target=self._handle_request_event, args=(payload,), daemon=True).start() elif event_name == "gui_task": with self._pending_gui_tasks_lock: # Directly append the task from the hook server. # It already contains 'action' and any necessary fields. self._pending_gui_tasks.append(payload) elif event_name == "mma_state_update": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "mma_state_update", "payload": payload }) elif event_name == "mma_stream": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "mma_stream_append", "payload": payload }) elif event_name in ("mma_spawn_approval", "mma_step_approval"): with self._pending_gui_tasks_lock: # These payloads already contain the 'action' field self._pending_gui_tasks.append(payload) elif event_name == "response": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": payload }) if self.test_hooks_enabled: with self._api_event_queue_lock: self._api_event_queue.append({"type": "response", "payload": payload}) def _handle_request_event(self, event: events.UserRequestEvent) -> None: """Processes a UserRequestEvent by calling the AI client.""" ai_client.current_tier = None # Ensure main discussion is untagged if self.ui_auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "User", "content": event.prompt, "collapsed": False, "ts": project_manager.now_ts() }) # Clear response area for new turn self.ai_response = "" csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit) ai_client.set_agent_tools(self.ui_agent_tools) # Force update adapter path right before send to bypass potential duplication issues self._update_gcli_adapter(self.ui_gemini_cli_path) sys.stderr.write(f"[DEBUG] Calling ai_client.send with provider={ai_client.get_provider()}, model={self.current_model}, gcli_path={self.ui_gemini_cli_path}\n") sys.stderr.flush() try: resp = ai_client.send( event.stable_md, event.prompt, event.base_dir, event.file_items, event.disc_text, stream=True, stream_callback=lambda text: self._on_ai_stream(text), pre_tool_callback=self._confirm_and_run, qa_callback=ai_client.run_tier4_analysis ) self.event_queue.put("response", {"text": resp, "status": "done", "role": "AI"}) except ai_client.ProviderError as e: sys.stderr.write(f"[DEBUG] _handle_request_event ai_client.ProviderError: {e.ui_message()}\n") sys.stderr.flush() self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}) except Exception as e: import traceback sys.stderr.write(f"[DEBUG] _handle_request_event ERROR: {e}\n{traceback.format_exc()}\n") sys.stderr.flush() self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}) def _on_ai_stream(self, text: str) -> None: """Handles streaming text from the AI.""" self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"}) def _on_comms_entry(self, entry: Dict[str, Any]) -> None: session_logger.log_comms(entry) entry["local_ts"] = time.time() kind = entry.get("kind") payload = entry.get("payload", {}) if kind in ("tool_result", "tool_call"): role = "Tool" if kind == "tool_result" else "Vendor API" content = "" if kind == "tool_result": content = payload.get("output", "") else: content = payload.get("script") or payload.get("args") or payload.get("message", "") if isinstance(content, dict): content = json.dumps(content, indent=1) with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": role, "content": f"[{kind.upper().replace('_', ' ')}]\n{content}", "collapsed": True, "ts": entry.get("ts", project_manager.now_ts()) }) if kind == "history_add": payload = entry.get("payload", {}) with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": payload.get("role", "AI"), "content": payload.get("content", ""), "collapsed": payload.get("collapsed", False), "ts": entry.get("ts", project_manager.now_ts()) }) return with self._pending_comms_lock: self._pending_comms.append(entry) def _on_tool_log(self, script: str, result: str) -> None: session_logger.log_tool_call(script, result, None) source_tier = ai_client.current_tier with self._pending_tool_calls_lock: self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) def _on_api_event(self, event_name: str = "generic_event", **kwargs: Any) -> None: payload = kwargs.get("payload", {}) with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload}) if self.test_hooks_enabled: with self._api_event_queue_lock: self._api_event_queue.append({"type": event_name, "payload": payload}) def _on_performance_alert(self, message: str) -> None: alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load." with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "System", "content": alert_text, "ts": project_manager.now_ts() }) def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]: sys.stderr.write(f"[DEBUG] _confirm_and_run called. test_hooks={self.test_hooks_enabled}, manual_approve={getattr(self, 'ui_manual_approve', False)}\n") sys.stderr.flush() if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): sys.stderr.write("[DEBUG] Auto-approving script.\n") sys.stderr.flush() self._set_status("running powershell...") output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback) self._append_tool_log(script, output) self._set_status("powershell done, awaiting AI...") return output sys.stderr.write("[DEBUG] Creating ConfirmDialog.\n") sys.stderr.flush() dialog = ConfirmDialog(script, base_dir) is_headless = "--headless" in sys.argv if is_headless: with self._pending_dialog_lock: self._pending_actions[dialog._uid] = dialog else: with self._pending_dialog_lock: self._pending_dialog = dialog if self.test_hooks_enabled and hasattr(self, '_api_event_queue'): with self._api_event_queue_lock: self._api_event_queue.append({ "type": "script_confirmation_required", "action_id": dialog._uid, "script": str(script), "base_dir": str(base_dir), "ts": time.time() }) sys.stderr.write(f"[DEBUG] Appended script_confirmation_required to _api_event_queue. ID={dialog._uid}\n") sys.stderr.flush() sys.stderr.write(f"[DEBUG] Waiting for dialog ID={dialog._uid}...\n") sys.stderr.flush() approved, final_script = dialog.wait() sys.stderr.write(f"[DEBUG] Dialog ID={dialog._uid} finished wait. approved={approved}\n") sys.stderr.flush() if is_headless: with self._pending_dialog_lock: if dialog._uid in self._pending_actions: del self._pending_actions[dialog._uid] if not approved: self._append_tool_log(final_script, "REJECTED by user") return None self._set_status("running powershell...") output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback) self._append_tool_log(final_script, output) self._set_status("powershell done, awaiting AI...") return output def _append_tool_log(self, script: str, result: str, source_tier: str | None = None) -> None: self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) self.ui_last_script_text = script self.ui_last_script_output = result self._trigger_script_blink = True self.show_script_output = True if self.ui_auto_scroll_tool_calls: self._scroll_tool_calls_to_bottom = True def resolve_pending_action(self, action_id: str, approved: bool) -> bool: with self._pending_dialog_lock: if action_id in self._pending_actions: dialog = self._pending_actions[action_id] with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True elif self._pending_dialog and self._pending_dialog._uid == action_id: dialog = self._pending_dialog with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True return False @property def current_provider(self) -> str: return self._current_provider @current_provider.setter def current_provider(self, value: str) -> None: if value != self._current_provider: self._current_provider = value ai_client.reset_session() ai_client.set_provider(value, self.current_model) self._token_stats = {} self._token_stats_dirty = True @property def current_model(self) -> str: return self._current_model @current_model.setter def current_model(self, value: str) -> None: if value != self._current_model: self._current_model = value ai_client.reset_session() ai_client.set_provider(self.current_provider, value) self._token_stats = {} self._token_stats_dirty = True def create_api(self) -> FastAPI: """Creates and configures the FastAPI application for headless mode.""" api = FastAPI(title="Manual Slop Headless API") API_KEY_NAME = "X-API-KEY" api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) async def get_api_key(header_key: str = Depends(api_key_header)) -> str: """Validates the API key from the request header against configuration.""" headless_cfg = self.config.get("headless", {}) config_key = headless_cfg.get("api_key", "").strip() env_key = os.environ.get("SLOP_API_KEY", "").strip() target_key = env_key or config_key if not target_key: raise HTTPException(status_code=403, detail="API Key not configured on server") if header_key == target_key: return header_key raise HTTPException(status_code=403, detail="Could not validate API Key") @api.get("/health") def health() -> dict[str, str]: """Returns the health status of the API.""" return {"status": "ok"} @api.get("/api/gui/state", dependencies=[Depends(get_api_key)]) def get_gui_state() -> dict[str, Any]: """Returns the current GUI state for specific fields.""" gettable = getattr(self, "_gettable_fields", {}) state = {} import dataclasses for key, attr in gettable.items(): val = getattr(self, attr, None) if dataclasses.is_dataclass(val): state[key] = dataclasses.asdict(val) else: state[key] = val return state @api.get("/api/gui/mma_status", dependencies=[Depends(get_api_key)]) def get_mma_status() -> dict[str, Any]: """Dedicated endpoint for MMA-related status.""" return { "mma_status": self.mma_status, "ai_status": self.ai_status, "mma_streams": self.mma_streams, "active_tier": self.active_tier, "active_tickets": self.active_tickets, "proposed_tracks": self.proposed_tracks } @api.post("/api/gui", dependencies=[Depends(get_api_key)]) def post_gui(req: dict) -> dict[str, str]: """Pushes a GUI task to the event queue.""" self.event_queue.put("gui_task", req) return {"status": "queued"} @api.get("/api/session", dependencies=[Depends(get_api_key)]) def get_api_session() -> dict[str, Any]: """Returns current discussion session entries.""" with self._disc_entries_lock: return {"session": {"entries": self.disc_entries}} @api.post("/api/session", dependencies=[Depends(get_api_key)]) def post_api_session(req: dict) -> dict[str, str]: """Updates session entries.""" entries = req.get("entries", []) with self._disc_entries_lock: self.disc_entries = entries return {"status": "updated"} @api.get("/api/project", dependencies=[Depends(get_api_key)]) def get_api_project() -> dict[str, Any]: """Returns current project data.""" return {"project": self.project} @api.get("/api/performance", dependencies=[Depends(get_api_key)]) def get_performance() -> dict[str, Any]: """Returns performance monitor metrics.""" return {"performance": self.perf_monitor.get_metrics()} @api.get("/api/gui/diagnostics", dependencies=[Depends(get_api_key)]) def get_diagnostics() -> dict[str, Any]: """Alias for performance metrics.""" return self.perf_monitor.get_metrics() @api.get("/status", dependencies=[Depends(get_api_key)]) def status() -> dict[str, Any]: """Returns the current status of the application.""" return { "provider": self.current_provider, "model": self.current_model, "status": self.ai_status, "usage": self.session_usage } @api.post("/api/v1/generate", dependencies=[Depends(get_api_key)]) def generate(req: GenerateRequest) -> dict[str, Any]: """Triggers an AI generation request using the current project context.""" if not req.prompt.strip(): raise HTTPException(status_code=400, detail="Prompt cannot be empty") with self._send_thread_lock: start_time = time.time() try: md, path, file_items, stable_md, disc_text = self._do_generate() self._last_stable_md = stable_md self.last_md = md self.last_md_path = path self.last_file_items = file_items except Exception as e: raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") user_msg = req.prompt base_dir = self.ui_files_base_dir csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) temp = req.temperature if req.temperature is not None else self.temperature tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens ai_client.set_model_params(temp, tokens, self.history_trunc_limit) ai_client.set_agent_tools(self.ui_agent_tools) if req.auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "User", "content": user_msg, "collapsed": False, "ts": project_manager.now_ts() }) try: resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text) if req.auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "AI", "content": resp, "collapsed": False, "ts": project_manager.now_ts() }) self._recalculate_session_usage() duration = time.time() - start_time return { "text": resp, "metadata": { "provider": self.current_provider, "model": self.current_model, "duration_sec": round(duration, 3), "timestamp": project_manager.now_ts() }, "usage": self.session_usage } except ai_client.ProviderError as e: raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}") except Exception as e: raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}") @api.post("/api/v1/stream", dependencies=[Depends(get_api_key)]) async def stream(req: GenerateRequest) -> Any: """Placeholder for streaming AI generation responses (Not yet implemented).""" raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.") @api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)]) def pending_actions() -> list[dict[str, Any]]: """Lists all pending PowerShell scripts awaiting confirmation.""" with self._pending_dialog_lock: return [ {"action_id": uid, "script": diag._script, "base_dir": diag._base_dir} for uid, diag in self._pending_actions.items() ] @api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)]) def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, str]: """Approves or rejects a pending action.""" with self._pending_dialog_lock: if action_id not in self._pending_actions: raise HTTPException(status_code=404, detail="Action not found") dialog = self._pending_actions.pop(action_id) if req.script is not None: dialog._script = req.script with dialog._condition: dialog._approved = req.approved dialog._done = True dialog._condition.notify_all() return {"status": "confirmed" if req.approved else "rejected"} @api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)]) def list_sessions() -> list[str]: """Lists all session log files.""" log_dir = Path("logs") if not log_dir.exists(): return [] return [f.name for f in log_dir.glob("*.log")] @api.get("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) def get_session(session_id: str) -> dict[str, Any]: """Returns the content of a specific session log.""" log_path = Path("logs") / session_id if not log_path.exists(): raise HTTPException(status_code=404, detail="Session log not found") return {"id": session_id, "content": log_path.read_text(encoding="utf-8", errors="replace")} @api.delete("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) def delete_session(session_id: str) -> dict[str, str]: """Deletes a specific session log.""" log_path = Path("logs") / session_id if not log_path.exists(): raise HTTPException(status_code=404, detail="Session log not found") log_path.unlink() return {"status": "deleted"} @api.get("/api/v1/context", dependencies=[Depends(get_api_key)]) def get_context() -> dict[str, Any]: """Returns the current aggregated project context.""" try: md, path, file_items, stable_md, disc_text = self._do_generate() # Pull current screenshots if available in project screenshots = self.project.get("screenshots", {}).get("paths", []) return { "files": [f.get("path") if isinstance(f, dict) else str(f) for f in file_items], "screenshots": screenshots, "files_base_dir": self.ui_files_base_dir, "markdown": md, "discussion": disc_text } except Exception as e: raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") @api.get("/api/v1/token_stats", dependencies=[Depends(get_api_key)]) def token_stats() -> dict[str, Any]: """Returns current token usage and budget statistics.""" return self._token_stats return api def _cb_new_project_automated(self, user_data: Any) -> None: if user_data: name = Path(user_data).stem proj = project_manager.default_project(name) project_manager.save_project(proj, user_data) if user_data not in self.project_paths: self.project_paths.append(user_data) self._switch_project(user_data) def _cb_project_save(self) -> None: self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self._set_status("config saved") def _cb_disc_create(self) -> None: nm = self.ui_disc_new_name_input.strip() if nm: self._create_discussion(nm) self.ui_disc_new_name_input = "" def _switch_project(self, path: str) -> None: if not Path(path).exists(): self._set_status(f"project file not found: {path}") return self._flush_to_project() self._save_active_project() try: self.project = project_manager.load_project(path) self.active_project_path = path except Exception as e: self._set_status(f"failed to load project: {e}") return self._refresh_from_project() self._set_status(f"switched to: {Path(path).stem}") def _refresh_from_project(self) -> None: self.files = list(self.project.get("files", {}).get("paths", [])) self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles = list(disc_sec.get("roles", list(models.DISC_ROLES))) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) with self._disc_entries_lock: self.disc_entries = models.parse_history_entries(disc_data.get("history", []), self.disc_roles) proj = self.project self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".") proj_meta = self.project.get("project", {}) self.ui_project_git_dir = proj_meta.get("git_dir", "") self.ui_project_system_prompt = proj_meta.get("system_prompt", "") self.ui_project_main_context = proj_meta.get("main_context", "") self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini") self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False) self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True) self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True) self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True) self.ui_summary_only = proj.get("project", {}).get("summary_only", False) agent_tools_cfg = proj.get("agent", {}).get("tools", {}) self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in models.AGENT_TOOL_NAMES} # MMA Tracks self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) # Restore MMA state mma_sec = proj.get("mma", {}) self.ui_epic_input = mma_sec.get("epic", "") at_data = mma_sec.get("active_track") if at_data: try: tickets = [] for t_data in at_data.get("tickets", []): tickets.append(models.Ticket(**t_data)) self.active_track = models.Track( id=at_data.get("id"), description=at_data.get("description"), tickets=tickets ) self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table except Exception as e: print(f"Failed to deserialize active track: {e}") self.active_track = None else: self.active_track = None self.active_tickets = [] # Load track-scoped history if track is active if self.active_track: track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir) if track_history: with self._disc_entries_lock: self.disc_entries = models.parse_history_entries(track_history, self.disc_roles) def _cb_load_track(self, track_id: str) -> None: state = project_manager.load_track_state(track_id, self.ui_files_base_dir) if state: try: # Convert list[Ticket] or list[dict] to list[Ticket] for Track object tickets = [] for t in state.tasks: if isinstance(t, dict): tickets.append(models.Ticket(**t)) else: tickets.append(t) self.active_track = models.Track( id=state.metadata.id, description=state.metadata.name, tickets=tickets ) # Keep dicts for UI table (or convert models.Ticket objects back to dicts if needed) self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets] # Load track-scoped history history = project_manager.load_track_history(track_id, self.ui_files_base_dir) with self._disc_entries_lock: if history: self.disc_entries = models.parse_history_entries(history, self.disc_roles) else: self.disc_entries = [] self._recalculate_session_usage() self._set_status(f"Loaded track: {state.metadata.name}") except Exception as e: self._set_status(f"Load track error: {e}") print(f"Error loading track {track_id}: {e}") def _save_active_project(self) -> None: if self.active_project_path: try: project_manager.save_project(self.project, self.active_project_path) except Exception as e: self._set_status(f"save error: {e}") def _get_discussion_names(self) -> list[str]: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) return sorted(discussions.keys()) def _switch_discussion(self, name: str) -> None: self._flush_disc_entries_to_project() disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if name not in discussions: self._set_status(f"discussion not found: {name}") return self.active_discussion = name self._track_discussion_active = False disc_sec["active"] = name disc_data = discussions[name] with self._disc_entries_lock: self.disc_entries = models.parse_history_entries(disc_data.get("history", []), self.disc_roles) self._set_status(f"discussion: {name}") def _flush_disc_entries_to_project(self) -> None: history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries] if self.active_track and self._track_discussion_active: project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir) return disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion()) disc_data["history"] = history_strings disc_data["last_updated"] = project_manager.now_ts() def _create_discussion(self, name: str) -> None: disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) if name in discussions: self._set_status(f"discussion '{name}' already exists") return discussions[name] = project_manager.default_discussion() self._switch_discussion(name) def _rename_discussion(self, old_name: str, new_name: str) -> None: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if old_name not in discussions: return if new_name in discussions: self._set_status(f"discussion '{new_name}' already exists") return discussions[new_name] = discussions.pop(old_name) if self.active_discussion == old_name: self.active_discussion = new_name disc_sec["active"] = new_name def _delete_discussion(self, name: str) -> None: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if len(discussions) <= 1: self._set_status("cannot delete the last discussion") return if name not in discussions: return del discussions[name] if self.active_discussion == name: remaining = sorted(discussions.keys()) self._switch_discussion(remaining[0]) def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None: if self._pending_mma_approval: dlg = self._pending_mma_approval.get("dialog_container", [None])[0] if dlg: with dlg._condition: dlg._approved = approved if payload is not None: dlg._payload = payload dlg._done = True dlg._condition.notify_all() self._pending_mma_approval = None if self._pending_mma_spawn: spawn_dlg = self._pending_mma_spawn.get("dialog_container", [None])[0] if spawn_dlg: with spawn_dlg._condition: spawn_dlg._approved = approved spawn_dlg._abort = abort if prompt is not None: spawn_dlg._prompt = prompt if context_md is not None: spawn_dlg._context_md = context_md spawn_dlg._done = True spawn_dlg._condition.notify_all() self._pending_mma_spawn = None def _handle_approve_ask(self) -> None: """Responds with approval for a pending /api/ask request.""" if not self._ask_request_id: return request_id = self._ask_request_id def do_post() -> None: try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": True}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reject_ask(self) -> None: """Responds with rejection for a pending /api/ask request.""" if not self._ask_request_id: return request_id = self._ask_request_id def do_post() -> None: try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": False}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reset_session(self) -> None: """Logic for resetting the AI session and GUI state.""" ai_client.reset_session() ai_client.clear_comms_log() self._tool_log.clear() self._comms_log.clear() self.disc_entries.clear() # Clear history in ALL discussions to be safe disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) for d_name in discussions: discussions[d_name]["history"] = [] self._set_status("session reset") self.ai_response = "" self.ui_ai_input = "" self.ui_manual_approve = False self.ui_auto_add_history = False self._current_provider = "gemini" self._current_model = "gemini-2.5-flash-lite" ai_client.set_provider(self._current_provider, self._current_model) with self._pending_history_adds_lock: self._pending_history_adds.clear() with self._api_event_queue_lock: self._api_event_queue.clear() with self._pending_gui_tasks_lock: self._pending_gui_tasks.clear() def _handle_md_only(self) -> None: """Logic for the 'MD Only' action.""" def worker(): try: md, path, *_ = self._do_generate() self.last_md = md self.last_md_path = path self._set_status(f"md written: {path.name}") # Refresh token budget metrics with CURRENT md self._refresh_api_metrics({}, md_content=md) except Exception as e: self._set_status(f"error: {e}") threading.Thread(target=worker, daemon=True).start() def _handle_generate_send(self) -> None: """Logic for the 'Gen + Send' action.""" def worker(): sys.stderr.write("[DEBUG] _handle_generate_send worker started\n") sys.stderr.flush() try: md, path, file_items, stable_md, disc_text = self._do_generate() self._last_stable_md = stable_md self.last_md = md self.last_md_path = path self.last_file_items = file_items self._set_status("sending...") user_msg = self.ui_ai_input base_dir = self.ui_files_base_dir sys.stderr.write(f"[DEBUG] _do_generate success. Prompt: {user_msg[:50]}...\n") sys.stderr.flush() # Prepare event payload event_payload = events.UserRequestEvent( prompt=user_msg, stable_md=stable_md, file_items=file_items, disc_text=disc_text, base_dir=base_dir ) # Push to async queue self.event_queue.put("user_request", event_payload) sys.stderr.write("[DEBUG] Enqueued user_request event\n") sys.stderr.flush() except Exception as e: import traceback sys.stderr.write(f"[DEBUG] _do_generate ERROR: {e}\n{traceback.format_exc()}\n") sys.stderr.flush() self._set_status(f"generate error: {e}") threading.Thread(target=worker, daemon=True).start() def _recalculate_session_usage(self) -> None: usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0} for entry in ai_client.get_comms_log(): if entry.get("kind") == "response" and "usage" in entry.get("payload", {}): u = entry["payload"]["usage"] for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: if k in usage: usage[k] += u.get(k, 0) or 0 self.session_usage = usage def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None: if "latency" in payload: self.session_usage["last_latency"] = payload["latency"] self._recalculate_session_usage() if md_content is not None: stats = ai_client.get_token_stats(md_content) # Ensure compatibility if keys are named differently if "total_tokens" in stats and "estimated_prompt_tokens" not in stats: stats["estimated_prompt_tokens"] = stats["total_tokens"] self._token_stats = stats cache_stats = payload.get("cache_stats") if cache_stats: count = cache_stats.get("cache_count", 0) size_bytes = cache_stats.get("total_size_bytes", 0) self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)" def _flush_to_project(self) -> None: proj = self.project proj.setdefault("output", {})["output_dir"] = self.ui_output_dir proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir proj["files"]["paths"] = self.files proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir proj["screenshots"]["paths"] = self.screenshots proj.setdefault("project", {}) proj["project"]["git_dir"] = self.ui_project_git_dir proj["project"]["system_prompt"] = self.ui_project_system_prompt proj["project"]["main_context"] = self.ui_project_main_context proj["project"]["word_wrap"] = self.ui_word_wrap proj["project"]["summary_only"] = self.ui_summary_only proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path proj.setdefault("agent", {}).setdefault("tools", {}) for t_name in models.AGENT_TOOL_NAMES: proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True) self._flush_disc_entries_to_project() disc_sec = proj.setdefault("discussion", {}) disc_sec["roles"] = self.disc_roles disc_sec["active"] = self.active_discussion disc_sec["auto_add"] = self.ui_auto_add_history # Save MMA State mma_sec = proj.setdefault("mma", {}) mma_sec["epic"] = self.ui_epic_input if self.active_track: mma_sec["active_track"] = asdict(self.active_track) else: mma_sec["active_track"] = None def _flush_to_config(self) -> None: self.config["ai"] = { "provider": self.current_provider, "model": self.current_model, "temperature": self.temperature, "max_tokens": self.max_tokens, "history_trunc_limit": self.history_trunc_limit, } self.config["ai"]["system_prompt"] = self.ui_global_system_prompt self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path} self.config["gui"] = {"show_windows": self.show_windows} theme.save_to_config(self.config) def _do_generate(self) -> tuple[str, Path, list[dict[str, Any]], str, str]: """Returns (full_md, output_path, file_items, stable_md, discussion_text).""" self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) track_id = self.active_track.id if self.active_track else None flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id) full_md, path, file_items = aggregate.run(flat) # Build stable markdown (no history) for Gemini caching screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", ".")) screenshots = flat.get("screenshots", {}).get("paths", []) summary_only = flat.get("project", {}).get("summary_only", False) stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only) # Build discussion history text separately history = flat.get("discussion", {}).get("history", []) discussion_text = aggregate.build_discussion_text(history) return full_md, path, file_items, stable_md, discussion_text def _cb_plan_epic(self) -> None: def _bg_task() -> None: sys.stderr.write("[DEBUG] _cb_plan_epic _bg_task started\n") sys.stderr.flush() try: self._set_status("Planning Epic (Tier 1)...") history = orchestrator_pm.get_track_history_summary() sys.stderr.write(f"[DEBUG] History summary length: {len(history)}\n") sys.stderr.flush() proj = project_manager.load_project(self.active_project_path) flat = project_manager.flat_config(proj) file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", [])) _t1_baseline = len(ai_client.get_comms_log()) tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history) _t1_new = ai_client.get_comms_log()[_t1_baseline:] _t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"] _t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp) _t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp) def _push_t1_usage(i: int, o: int) -> None: self.mma_tier_usage["Tier 1"]["input"] += i self.mma_tier_usage["Tier 1"]["output"] += o with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "custom_callback", "callback": _push_t1_usage, "args": [_t1_in, _t1_out] }) self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": { "text": json.dumps(tracks, indent=2), "stream_id": "Tier 1", "status": "Epic tracks generated." } }) self._pending_gui_tasks.append({ "action": "show_track_proposal", "payload": tracks }) except Exception as e: self._set_status(f"Epic plan error: {e}") print(f"ERROR in _cb_plan_epic background task: {e}") threading.Thread(target=_bg_task, daemon=True).start() def _cb_accept_tracks(self) -> None: self._show_track_proposal_modal = False def _bg_task() -> None: sys.stderr.write("[DEBUG] _cb_accept_tracks _bg_task started\n") # Generate skeletons once self._set_status("Phase 2: Generating skeletons for all tracks...") sys.stderr.write("[DEBUG] Creating ASTParser...\n") parser = ASTParser(language="python") generated_skeletons = "" try: # Use a local copy of files to avoid concurrent modification issues files_to_scan = list(self.files) sys.stderr.write(f"[DEBUG] Scanning {len(files_to_scan)} files for skeletons...\n") for i, file_path in enumerate(files_to_scan): try: self._set_status(f"Phase 2: Scanning files ({i+1}/{len(files_to_scan)})...") abs_path = Path(self.ui_files_base_dir) / file_path if abs_path.exists() and abs_path.suffix == ".py": with open(abs_path, "r", encoding="utf-8") as f: code = f.read() generated_skeletons += f"\nFile: {file_path}\n{parser.get_skeleton(code)}\n" except Exception as e: sys.stderr.write(f"[DEBUG] Error parsing skeleton for {file_path}: {e}\n") except Exception as e: sys.stderr.write(f"[DEBUG] Error in scan loop: {e}\n") self._set_status(f"Error generating skeletons: {e}") return # Exit if skeleton generation fails sys.stderr.write("[DEBUG] Skeleton generation complete. Starting tracks...\n") # Now loop through tracks and call _start_track_logic with generated skeletons total_tracks = len(self.proposed_tracks) for i, track_data in enumerate(self.proposed_tracks): title = track_data.get("title") or track_data.get("goal", "Untitled Track") self._set_status(f"Processing track {i+1} of {total_tracks}: '{title}'...") self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons sys.stderr.write("[DEBUG] All tracks started. Refreshing...\n") with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started self._set_status(f"All {total_tracks} tracks accepted and execution started.") threading.Thread(target=_bg_task, daemon=True).start() def _cb_start_track(self, user_data: Any = None) -> None: if isinstance(user_data, str): # If track_id is provided directly track_id = user_data # Ensure it's loaded as active if not self.active_track or self.active_track.id != track_id: self._cb_load_track(track_id) if self.active_track: # Use the active track object directly to start execution self._set_mma_status("running") engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode) flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id) full_md, _, _ = aggregate.run(flat) threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start() self._set_status(f"Track '{self.active_track.description}' started.") return idx = 0 if isinstance(user_data, int): idx = user_data elif isinstance(user_data, dict): idx = user_data.get("index", 0) if 0 <= idx < len(self.proposed_tracks): track_data = self.proposed_tracks[idx] title = track_data.get("title") or track_data.get("goal", "Untitled Track") threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start() self._set_status(f"Track '{title}' started.") def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None: try: goal = track_data.get("goal", "") title = track_data.get("title") or track_data.get("goal", "Untitled Track") self._set_status(f"Phase 2: Generating tickets for {title}...") skeletons = skeletons_str or "" # Use provided skeletons or empty self._set_status("Phase 2: Calling Tech Lead...") _t2_baseline = len(ai_client.get_comms_log()) raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons) _t2_new = ai_client.get_comms_log()[_t2_baseline:] _t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"] _t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp) _t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp) def _push_t2_usage(i: int, o: int) -> None: self.mma_tier_usage["Tier 2"]["input"] += i self.mma_tier_usage["Tier 2"]["output"] += o with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "custom_callback", "callback": _push_t2_usage, "args": [_t2_in, _t2_out] }) if not raw_tickets: self._set_status(f"Error: No tickets generated for track: {title}") print(f"Warning: No tickets generated for track: {title}") return self._set_status("Phase 2: Sorting tickets...") try: sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets) except ValueError as e: print(f"Dependency error in track '{title}': {e}") sorted_tickets_data = raw_tickets # 3. Create Track and Ticket objects tickets = [] for t_data in sorted_tickets_data: ticket = models.Ticket( id=t_data["id"], description=t_data.get("description") or t_data.get("goal", "No description"), status=t_data.get("status", "todo"), assigned_to=t_data.get("assigned_to", "unassigned"), depends_on=t_data.get("depends_on", []), step_mode=t_data.get("step_mode", False) ) tickets.append(ticket) track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}" track = models.Track(id=track_id, description=title, tickets=tickets) # Initialize track state in the filesystem meta = models.Metadata(id=track_id, name=title, status="todo", created_at=datetime.now(), updated_at=datetime.now()) state = models.TrackState(metadata=meta, discussion=[], tasks=tickets) project_manager.save_track_state(track_id, state, self.ui_files_base_dir) # Add to memory and notify UI self.tracks.append({"id": track_id, "title": title, "status": "todo"}) with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # 4. Initialize ConductorEngine and run loop engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode) # Use current full markdown context for the track execution track_id_param = track.id flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param) full_md, _, _ = aggregate.run(flat) # Start the engine in a separate thread threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start() except Exception as e: self._set_status(f"Track start error: {e}") print(f"ERROR in _start_track_logic: {e}") def _cb_ticket_retry(self, ticket_id: str) -> None: for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'todo' break self.event_queue.put("mma_retry", {"ticket_id": ticket_id}) def _cb_ticket_skip(self, ticket_id: str) -> None: for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'skipped' break self.event_queue.put("mma_skip", {"ticket_id": ticket_id}) def _cb_run_conductor_setup(self) -> None: base = Path("conductor") if not base.exists(): self.ui_conductor_setup_summary = "Error: conductor/ directory not found." return files = list(base.glob("**/*")) files = [f for f in files if f.is_file()] summary = [f"Conductor Directory: {base.absolute()}"] summary.append(f"Total Files: {len(files)}") total_lines = 0 for f in files: try: with open(f, "r", encoding="utf-8") as fd: lines = len(fd.readlines()) total_lines += lines summary.append(f"- {f.relative_to(base)}: {lines} lines") except Exception: summary.append(f"- {f.relative_to(base)}: Error reading") summary.append(f"Total Line Count: {total_lines}") tracks_dir = base / "tracks" if tracks_dir.exists(): tracks = [d for d in tracks_dir.iterdir() if d.is_dir()] summary.append(f"Total Tracks Found: {len(tracks)}") else: summary.append("Tracks Directory: Not found") self.ui_conductor_setup_summary = "\n".join(summary) def _cb_create_track(self, name: str, desc: str, track_type: str) -> None: if not name: return date_suffix = datetime.now().strftime("%Y%m%d") track_id = f"{name.lower().replace(' ', '_')}_{date_suffix}" track_dir = Path("conductor/tracks") / track_id track_dir.mkdir(parents=True, exist_ok=True) spec_file = track_dir / "spec.md" with open(spec_file, "w", encoding="utf-8") as f: f.write(f"# Specification: {name}\n\nType: {track_type}\n\nDescription: {desc}\n") plan_file = track_dir / "plan.md" with open(plan_file, "w", encoding="utf-8") as f: f.write(f"# Implementation Plan: {name}\n\n- [ ] Task 1: Initialize\n") meta_file = track_dir / "metadata.json" with open(meta_file, "w", encoding="utf-8") as f: json.dump({ "id": track_id, "title": name, "description": desc, "type": track_type, "status": "new", "progress": 0.0 }, f, indent=1) # Refresh tracks from disk self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) def _push_mma_state_update(self) -> None: if not self.active_track: return # Sync active_tickets (list of dicts) back to active_track.tickets (list of models.Ticket objects) self.active_track.tickets = [models.Ticket.from_dict(t) for t in self.active_tickets] # Save the state to disk existing = project_manager.load_track_state(self.active_track.id, self.ui_files_base_dir) meta = models.Metadata( id=self.active_track.id, name=self.active_track.description, status=self.mma_status, created_at=existing.metadata.created_at if existing else datetime.now(), updated_at=datetime.now() ) state = models.TrackState( metadata=meta, discussion=existing.discussion if existing else [], tasks=self.active_track.tickets ) project_manager.save_track_state(self.active_track.id, state, self.ui_files_base_dir)