diff --git a/src/app_controller.py b/src/app_controller.py index 932a01f..367d8ea 100644 --- a/src/app_controller.py +++ b/src/app_controller.py @@ -3,14 +3,106 @@ import threading import time import sys import os -from typing import Any, List, Dict, Optional, Tuple +from typing import Any, List, Dict, Optional, Tuple, Callable from pathlib import Path +import json +import uuid +import tomli_w +import requests +from dataclasses import asdict +from datetime import datetime +from tkinter import filedialog, Tk + +from fastapi import FastAPI, Depends, HTTPException +from fastapi.security.api_key import APIKeyHeader +from pydantic import BaseModel from src import events from src import session_logger from src import project_manager from src.performance_monitor import PerformanceMonitor -from src.models import Track, Ticket, load_config, parse_history_entries, DISC_ROLES, AGENT_TOOL_NAMES +from src.models import Track, Ticket, load_config, parse_history_entries, DISC_ROLES, AGENT_TOOL_NAMES, CONFIG_PATH +from src.log_registry import LogRegistry +from src.log_pruner import LogPruner +from src.file_cache import ASTParser +import ai_client +import shell_runner +import mcp_client +import aggregate +import orchestrator_pm +import conductor_tech_lead +import cost_tracker +import multi_agent_conductor +from src import theme +from ai_client import ProviderError + +def save_config(config: dict[str, Any]) -> None: + with open(CONFIG_PATH, "wb") as f: + tomli_w.dump(config, f) + +def hide_tk_root() -> Tk: + root = Tk() + root.withdraw() + root.wm_attributes("-topmost", True) + return root + +class GenerateRequest(BaseModel): + prompt: str + auto_add_history: bool = True + temperature: float | None = None + max_tokens: int | None = None + +class ConfirmRequest(BaseModel): + approved: bool + script: Optional[str] = None + +class ConfirmDialog: + def __init__(self, script: str, base_dir: str) -> None: + self._uid = str(uuid.uuid4()) + self._script = str(script) if script is not None else "" + self._base_dir = str(base_dir) if base_dir is not None else "" + self._condition = threading.Condition() + self._done = False + self._approved = False + + def wait(self) -> tuple[bool, str]: + with self._condition: + while not self._done: + self._condition.wait(timeout=0.1) + return self._approved, self._script + +class MMAApprovalDialog: + def __init__(self, ticket_id: str, payload: str) -> None: + self._payload = payload + self._condition = threading.Condition() + self._done = False + self._approved = False + + def wait(self) -> tuple[bool, str]: + with self._condition: + while not self._done: + self._condition.wait(timeout=0.1) + return self._approved, self._payload + +class MMASpawnApprovalDialog: + def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None: + self._prompt = prompt + self._context_md = context_md + self._condition = threading.Condition() + self._done = False + self._approved = False + self._abort = False + + def wait(self) -> dict[str, Any]: + with self._condition: + while not self._done: + self._condition.wait(timeout=0.1) + return { + 'approved': self._approved, + 'abort': self._abort, + 'prompt': self._prompt, + 'context_md': self._context_md + } class AppController: """ @@ -69,6 +161,12 @@ class AppController: self.perf_monitor: PerformanceMonitor = PerformanceMonitor() self._pending_gui_tasks: List[Dict[str, Any]] = [] + # Pending dialogs state moved from App + self._pending_dialog: Optional[ConfirmDialog] = None + self._pending_dialog_open: bool = False + self._pending_actions: Dict[str, ConfirmDialog] = {} + self._pending_ask_dialog: bool = False + # AI settings state self._current_provider: str = "gemini" self._current_model: str = "gemini-2.5-flash-lite" @@ -233,6 +331,33 @@ class AppController: label = self.project.get("project", {}).get("name", "") session_logger.open_session(label=label) + def cb_load_prior_log(self) -> None: + root = hide_tk_root() + path = filedialog.askopenfilename( + title="Load Session Log", + initialdir="logs", + filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")] + ) + root.destroy() + if not path: + return + entries = [] + try: + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + except Exception as e: + self.ai_status = f"log load error: {e}" + return + self.prior_session_entries = entries + self.is_viewing_prior_session = True + self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)" + def _load_active_project(self) -> None: """Loads the active project configuration, with fallbacks.""" if self.active_project_path and Path(self.active_project_path).exists(): @@ -257,20 +382,1247 @@ class AppController: if fallback_path not in self.project_paths: self.project_paths.append(fallback_path) + def _prune_old_logs(self) -> None: + """Asynchronously prunes old insignificant logs on startup.""" + def run_prune() -> None: + try: + registry = LogRegistry("logs/log_registry.toml") + pruner = LogPruner(registry, "logs") + pruner.prune() + except Exception as e: + print(f"Error during log pruning: {e}") + thread = threading.Thread(target=run_prune, daemon=True) + thread.start() + + def _fetch_models(self, provider: str) -> None: + self.ai_status = "fetching models..." + def do_fetch() -> None: + try: + models = ai_client.list_models(provider) + self.available_models = models + if self.current_model not in models and models: + self.current_model = models[0] + ai_client.set_provider(self._current_provider, self.current_model) + self.ai_status = f"models loaded: {len(models)}" + except Exception as e: + self.ai_status = f"model fetch error: {e}" + self.models_thread = threading.Thread(target=do_fetch, daemon=True) + self.models_thread.start() + def start_services(self): """Starts background threads and async event loop.""" + self._prune_old_logs() + self._init_ai_and_hooks() self._loop = asyncio.new_event_loop() self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) self._loop_thread.start() + def _init_ai_and_hooks(self) -> None: + import api_hooks + ai_client.set_provider(self._current_provider, self._current_model) + if self._current_provider == "gemini_cli": + if not ai_client._gemini_cli_adapter: + ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) + else: + ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path + ai_client.confirm_and_run_callback = self._confirm_and_run + ai_client.comms_log_callback = self._on_comms_entry + ai_client.tool_log_callback = self._on_tool_log + mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics + self.perf_monitor.alert_callback = self._on_performance_alert + ai_client.events.on("request_start", self._on_api_event) + ai_client.events.on("response_received", self._on_api_event) + ai_client.events.on("tool_execution", self._on_api_event) + + self._settable_fields: Dict[str, str] = { + 'ai_input': 'ui_ai_input', + 'project_git_dir': 'ui_project_git_dir', + 'auto_add_history': 'ui_auto_add_history', + 'disc_new_name_input': 'ui_disc_new_name_input', + 'project_main_context': 'ui_project_main_context', + 'gcli_path': 'ui_gemini_cli_path', + 'output_dir': 'ui_output_dir', + 'files_base_dir': 'ui_files_base_dir', + 'ai_status': 'ai_status', + 'ai_response': 'ai_response', + 'active_discussion': 'active_discussion', + 'current_provider': 'current_provider', + 'current_model': 'current_model', + 'token_budget_pct': '_token_budget_pct', + 'token_budget_current': '_token_budget_current', + 'token_budget_label': '_token_budget_label', + 'show_confirm_modal': 'show_confirm_modal', + 'mma_epic_input': 'ui_epic_input', + 'mma_status': 'mma_status', + 'mma_active_tier': 'active_tier', + 'ui_new_track_name': 'ui_new_track_name', + 'ui_new_track_desc': 'ui_new_track_desc', + 'manual_approve': 'ui_manual_approve' + } + + self.hook_server = api_hooks.HookServer(self) + self.hook_server.start() + def _run_event_loop(self): """Internal loop runner.""" asyncio.set_event_loop(self._loop) + self._loop.create_task(self._process_event_queue()) + + # Fallback: process queues even if GUI thread is idling/stuck + async def queue_fallback() -> None: + while True: + try: + # Headless/fallback queue processing + # Note: In GUI mode, App._gui_func still calls these for low latency. + self._process_pending_gui_tasks() + # _process_pending_history_adds might need more care regarding project state + except: pass + await asyncio.sleep(0.1) + + self._loop.create_task(queue_fallback()) self._loop.run_forever() + async def _process_event_queue(self) -> None: + """Listens for and processes events from the AsyncEventQueue.""" + while True: + event_name, payload = await self.event_queue.get() + if event_name == "user_request": + self._loop.run_in_executor(None, self._handle_request_event, payload) + elif event_name == "response": + with self._pending_gui_tasks_lock: + self._pending_gui_tasks.append({ + "action": "handle_ai_response", + "payload": payload + }) + elif event_name == "mma_state_update": + with self._pending_gui_tasks_lock: + self._pending_gui_tasks.append({ + "action": "mma_state_update", + "payload": payload + }) + elif event_name == "mma_stream": + with self._pending_gui_tasks_lock: + self._pending_gui_tasks.append({ + "action": "mma_stream_append", + "payload": payload + }) + elif event_name in ("mma_spawn_approval", "mma_step_approval"): + with self._pending_gui_tasks_lock: + self._pending_gui_tasks.append(payload) + + def _handle_request_event(self, event: events.UserRequestEvent) -> None: + """Processes a UserRequestEvent by calling the AI client.""" + if self.ui_auto_add_history: + with self._pending_history_adds_lock: + self._pending_history_adds.append({ + "role": "User", + "content": event.prompt, + "collapsed": False, + "ts": project_manager.now_ts() + }) + csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) + ai_client.set_custom_system_prompt("\n\n".join(csp)) + ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit) + ai_client.set_agent_tools(self.ui_agent_tools) + try: + resp = ai_client.send( + event.stable_md, + event.prompt, + event.base_dir, + event.file_items, + event.disc_text, + pre_tool_callback=self._confirm_and_run, + qa_callback=ai_client.run_tier4_analysis + ) + asyncio.run_coroutine_threadsafe( + self.event_queue.put("response", {"text": resp, "status": "done"}), + self._loop + ) + except ProviderError as e: + asyncio.run_coroutine_threadsafe( + self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}), + self._loop + ) + except Exception as e: + asyncio.run_coroutine_threadsafe( + self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}), + self._loop + ) + + def _on_comms_entry(self, entry: Dict[str, Any]) -> None: + session_logger.log_comms(entry) + entry["local_ts"] = time.time() + kind = entry.get("kind") + payload = entry.get("payload", {}) + if kind in ("tool_result", "tool_call"): + role = "Tool" if kind == "tool_result" else "Vendor API" + content = "" + if kind == "tool_result": + content = payload.get("output", "") + else: + content = payload.get("script") or payload.get("args") or payload.get("message", "") + if isinstance(content, dict): + content = json.dumps(content, indent=1) + with self._pending_history_adds_lock: + self._pending_history_adds.append({ + "role": role, + "content": f"[{kind.upper().replace('_', ' ')}]\n{content}", + "collapsed": True, + "ts": entry.get("ts", project_manager.now_ts()) + }) + if kind == "history_add": + payload = entry.get("payload", {}) + with self._pending_history_adds_lock: + self._pending_history_adds.append({ + "role": payload.get("role", "AI"), + "content": payload.get("content", ""), + "collapsed": payload.get("collapsed", False), + "ts": entry.get("ts", project_manager.now_ts()) + }) + return + with self._pending_comms_lock: + self._pending_comms.append(entry) + + def _on_tool_log(self, script: str, result: str) -> None: + session_logger.log_tool_call(script, result, None) + source_tier = ai_client.current_tier + with self._pending_tool_calls_lock: + self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) + + def _on_api_event(self, *args: Any, **kwargs: Any) -> None: + payload = kwargs.get("payload", {}) + with self._pending_gui_tasks_lock: + self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload}) + + def _on_performance_alert(self, message: str) -> None: + alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load." + with self._pending_history_adds_lock: + self._pending_history_adds.append({ + "role": "System", + "content": alert_text, + "ts": project_manager.now_ts() + }) + + def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]: + if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): + self.ai_status = "running powershell..." + output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback) + self._append_tool_log(script, output) + self.ai_status = "powershell done, awaiting AI..." + return output + dialog = ConfirmDialog(script, base_dir) + is_headless = "--headless" in sys.argv + if is_headless: + with self._pending_dialog_lock: + self._pending_actions[dialog._uid] = dialog + else: + with self._pending_dialog_lock: + self._pending_dialog = dialog + + if self.test_hooks_enabled and hasattr(self, '_api_event_queue'): + with self._api_event_queue_lock: + self._api_event_queue.append({ + "type": "script_confirmation_required", + "action_id": dialog._uid, + "script": str(script), + "base_dir": str(base_dir), + "ts": time.time() + }) + + approved, final_script = dialog.wait() + if is_headless: + with self._pending_dialog_lock: + if dialog._uid in self._pending_actions: + del self._pending_actions[dialog._uid] + if not approved: + self._append_tool_log(final_script, "REJECTED by user") + return None + self.ai_status = "running powershell..." + output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback) + self._append_tool_log(final_script, output) + self.ai_status = "powershell done, awaiting AI..." + return output + + def _append_tool_log(self, script: str, result: str, source_tier: str | None = None) -> None: + self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) + self.ui_last_script_text = script + self.ui_last_script_output = result + self._trigger_script_blink = True + self.show_script_output = True + if self.ui_auto_scroll_tool_calls: + self._scroll_tool_calls_to_bottom = True + + def resolve_pending_action(self, action_id: str, approved: bool) -> bool: + with self._pending_dialog_lock: + if action_id in self._pending_actions: + dialog = self._pending_actions[action_id] + with dialog._condition: + dialog._approved = approved + dialog._done = True + dialog._condition.notify_all() + return True + elif self._pending_dialog and self._pending_dialog._uid == action_id: + dialog = self._pending_dialog + with dialog._condition: + dialog._approved = approved + dialog._done = True + dialog._condition.notify_all() + return True + return False + + def _process_pending_gui_tasks(self) -> None: + if not self._pending_gui_tasks: + return + with self._pending_gui_tasks_lock: + tasks = self._pending_gui_tasks[:] + self._pending_gui_tasks.clear() + for task in tasks: + try: + action = task.get("action") + if action == "refresh_api_metrics": + self._refresh_api_metrics(task.get("payload", {})) + elif action == "handle_ai_response": + payload = task.get("payload", {}) + text = payload.get("text", "") + stream_id = payload.get("stream_id") + is_streaming = payload.get("status") == "streaming..." + if stream_id: + if is_streaming: + if stream_id not in self.mma_streams: self.mma_streams[stream_id] = "" + self.mma_streams[stream_id] += text + else: + self.mma_streams[stream_id] = text + if stream_id == "Tier 1": + if "status" in payload: + self.ai_status = payload["status"] + else: + if is_streaming: + self.ai_response += text + else: + self.ai_response = text + self.ai_status = payload.get("status", "done") + self._trigger_blink = True + if not stream_id: + self._token_stats_dirty = True + elif action == "mma_stream_append": + payload = task.get("payload", {}) + stream_id = payload.get("stream_id") + text = payload.get("text", "") + if stream_id: + if stream_id not in self.mma_streams: + self.mma_streams[stream_id] = "" + self.mma_streams[stream_id] += text + elif action == "mma_state_update": + payload = task.get("payload", {}) + self.mma_status = payload.get("status", "idle") + self.active_tier = payload.get("active_tier") + self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage) + self.active_tickets = payload.get("tickets", []) + elif action == "mma_step_approval": + dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or "")) + self._pending_mma_approval = task + if "dialog_container" in task: + task["dialog_container"][0] = dlg + elif action == "mma_spawn_approval": + spawn_dlg = MMASpawnApprovalDialog( + str(task.get("ticket_id") or ""), + str(task.get("role") or ""), + str(task.get("prompt") or ""), + str(task.get("context_md") or "") + ) + self._pending_mma_spawn = task + if "dialog_container" in task: + task["dialog_container"][0] = spawn_dlg + except Exception as e: + print(f"Error executing GUI task: {e}") + def stop_services(self): """Stops background threads and async event loop.""" if self._loop: self._loop.call_soon_threadsafe(self._loop.stop) if self._loop_thread: self._loop_thread.join(timeout=2.0) + + @property + def current_provider(self) -> str: + return self._current_provider + + @current_provider.setter + def current_provider(self, value: str) -> None: + if value != self._current_provider: + self._current_provider = value + ai_client.reset_session() + ai_client.set_provider(value, self.current_model) + if value == "gemini_cli": + if not ai_client._gemini_cli_adapter: + ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) + else: + ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path + if hasattr(self, 'hook_server'): + self.hook_server.start() + self.available_models = [] + self._fetch_models(value) + self._token_stats = {} + self._token_stats_dirty = True + + @property + def current_model(self) -> str: + return self._current_model + + @current_model.setter + def current_model(self, value: str) -> None: + if value != self._current_model: + self._current_model = value + ai_client.reset_session() + ai_client.set_provider(self.current_provider, value) + self._token_stats = {} + self._token_stats_dirty = True + + def create_api(self) -> FastAPI: + """Creates and configures the FastAPI application for headless mode.""" + api = FastAPI(title="Manual Slop Headless API") + + API_KEY_NAME = "X-API-KEY" + api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) + + async def get_api_key(header_key: str = Depends(api_key_header)) -> str: + """Validates the API key from the request header against configuration.""" + headless_cfg = self.config.get("headless", {}) + config_key = headless_cfg.get("api_key", "").strip() + env_key = os.environ.get("SLOP_API_KEY", "").strip() + target_key = env_key or config_key + if not target_key: + raise HTTPException(status_code=403, detail="API Key not configured on server") + if header_key == target_key: + return header_key + raise HTTPException(status_code=403, detail="Could not validate API Key") + + @api.get("/health") + def health() -> dict[str, str]: + """Returns the health status of the API.""" + return {"status": "ok"} + + @api.get("/status", dependencies=[Depends(get_api_key)]) + def status() -> dict[str, Any]: + """Returns the current status of the application.""" + return { + "provider": self.current_provider, + "model": self.current_model, + "status": self.ai_status, + "usage": self.session_usage + } + + @api.post("/api/v1/generate", dependencies=[Depends(get_api_key)]) + def generate(req: GenerateRequest) -> dict[str, Any]: + """Triggers an AI generation request using the current project context.""" + if not req.prompt.strip(): + raise HTTPException(status_code=400, detail="Prompt cannot be empty") + with self._send_thread_lock: + start_time = time.time() + try: + md, path, file_items, stable_md, disc_text = self._do_generate() + self._last_stable_md = stable_md + self.last_md = md + self.last_md_path = path + self.last_file_items = file_items + except Exception as e: + raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") + user_msg = req.prompt + base_dir = self.ui_files_base_dir + csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) + ai_client.set_custom_system_prompt("\n\n".join(csp)) + temp = req.temperature if req.temperature is not None else self.temperature + tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens + ai_client.set_model_params(temp, tokens, self.history_trunc_limit) + ai_client.set_agent_tools(self.ui_agent_tools) + if req.auto_add_history: + with self._pending_history_adds_lock: + self._pending_history_adds.append({ + "role": "User", + "content": user_msg, + "collapsed": False, + "ts": project_manager.now_ts() + }) + try: + resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text) + if req.auto_add_history: + with self._pending_history_adds_lock: + self._pending_history_adds.append({ + "role": "AI", + "content": resp, + "collapsed": False, + "ts": project_manager.now_ts() + }) + self._recalculate_session_usage() + duration = time.time() - start_time + return { + "text": resp, + "metadata": { + "provider": self.current_provider, + "model": self.current_model, + "duration_sec": round(duration, 3), + "timestamp": project_manager.now_ts() + }, + "usage": self.session_usage + } + except ProviderError as e: + raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}") + except Exception as e: + raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}") + + @api.post("/api/v1/stream", dependencies=[Depends(get_api_key)]) + async def stream(req: GenerateRequest) -> Any: + """Placeholder for streaming AI generation responses (Not yet implemented).""" + raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.") + + @api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)]) + def pending_actions() -> list[dict[str, Any]]: + """Lists all pending PowerShell scripts awaiting confirmation.""" + with self._pending_dialog_lock: + return [ + {"action_id": uid, "script": diag._script, "base_dir": diag._base_dir} + for uid, diag in self._pending_actions.items() + ] + + @api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)]) + def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, str]: + """Approves or rejects a pending action.""" + with self._pending_dialog_lock: + if action_id not in self._pending_actions: + raise HTTPException(status_code=404, detail="Action not found") + dialog = self._pending_actions.pop(action_id) + if req.script is not None: + dialog._script = req.script + with dialog._condition: + dialog._approved = req.approved + dialog._done = True + dialog._condition.notify_all() + return {"status": "confirmed" if req.approved else "rejected"} + + @api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)]) + def list_sessions() -> list[str]: + """Lists all session log files.""" + log_dir = Path("logs") + if not log_dir.exists(): + return [] + return [f.name for f in log_dir.glob("*.log")] + + @api.get("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) + def get_session(session_id: str) -> dict[str, Any]: + """Returns the content of a specific session log.""" + log_path = Path("logs") / session_id + if not log_path.exists(): + raise HTTPException(status_code=404, detail="Session log not found") + return {"id": session_id, "content": log_path.read_text(encoding="utf-8", errors="replace")} + + @api.delete("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) + def delete_session(session_id: str) -> dict[str, str]: + """Deletes a specific session log.""" + log_path = Path("logs") / session_id + if not log_path.exists(): + raise HTTPException(status_code=404, detail="Session log not found") + log_path.unlink() + return {"status": "deleted"} + + @api.get("/api/v1/context", dependencies=[Depends(get_api_key)]) + def get_context() -> dict[str, Any]: + """Returns the current aggregated project context.""" + try: + md, path, file_items, stable_md, disc_text = self._do_generate() + # Pull current screenshots if available in project + screenshots = self.project.get("screenshots", {}).get("paths", []) + return { + "files": [f.get("path") if isinstance(f, dict) else str(f) for f in file_items], + "screenshots": screenshots, + "files_base_dir": self.ui_files_base_dir, + "markdown": md, + "discussion": disc_text + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") + + @api.get("/api/v1/token_stats", dependencies=[Depends(get_api_key)]) + def token_stats() -> dict[str, Any]: + """Returns current token usage and budget statistics.""" + return self._token_stats + + return api + + def _cb_new_project_automated(self, user_data: Any) -> None: + if user_data: + name = Path(user_data).stem + proj = project_manager.default_project(name) + project_manager.save_project(proj, user_data) + if user_data not in self.project_paths: + self.project_paths.append(user_data) + self._switch_project(user_data) + + def _cb_project_save(self) -> None: + self._flush_to_project() + self._save_active_project() + self._flush_to_config() + save_config(self.config) + self.ai_status = "config saved" + + def _cb_disc_create(self) -> None: + nm = self.ui_disc_new_name_input.strip() + if nm: + self._create_discussion(nm) + self.ui_disc_new_name_input = "" + + def _switch_project(self, path: str) -> None: + if not Path(path).exists(): + self.ai_status = f"project file not found: {path}" + return + self._flush_to_project() + self._save_active_project() + try: + self.project = project_manager.load_project(path) + self.active_project_path = path + except Exception as e: + self.ai_status = f"failed to load project: {e}" + return + self._refresh_from_project() + self.ai_status = f"switched to: {Path(path).stem}" + + def _refresh_from_project(self) -> None: + self.files = list(self.project.get("files", {}).get("paths", [])) + self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) + disc_sec = self.project.get("discussion", {}) + self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES))) + self.active_discussion = disc_sec.get("active", "main") + disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) + with self._disc_entries_lock: + self.disc_entries = parse_history_entries(disc_data.get("history", []), self.disc_roles) + proj = self.project + self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen") + self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".") + self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".") + self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "") + self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "") + self.ui_project_main_context = proj.get("project", {}).get("main_context", "") + self.ui_gemini_cli_path = proj.get("gemini_cli", {}).get("binary_path", "gemini") + self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False) + self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True) + self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True) + self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True) + self.ui_summary_only = proj.get("project", {}).get("summary_only", False) + agent_tools_cfg = proj.get("agent", {}).get("tools", {}) + self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES} + # MMA Tracks + self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) + # Restore MMA state + mma_sec = proj.get("mma", {}) + self.ui_epic_input = mma_sec.get("epic", "") + at_data = mma_sec.get("active_track") + if at_data: + try: + tickets = [] + for t_data in at_data.get("tickets", []): + tickets.append(Ticket(**t_data)) + self.active_track = Track( + id=at_data.get("id"), + description=at_data.get("description"), + tickets=tickets + ) + self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table + except Exception as e: + print(f"Failed to deserialize active track: {e}") + self.active_track = None + else: + self.active_track = None + self.active_tickets = [] + # Load track-scoped history if track is active + if self.active_track: + track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir) + if track_history: + with self._disc_entries_lock: + self.disc_entries = parse_history_entries(track_history, self.disc_roles) + + def _cb_load_track(self, track_id: str) -> None: + state = project_manager.load_track_state(track_id, self.ui_files_base_dir) + if state: + try: + # Convert list[Ticket] or list[dict] to list[Ticket] for Track object + tickets = [] + for t in state.tasks: + if isinstance(t, dict): + tickets.append(Ticket(**t)) + else: + tickets.append(t) + self.active_track = Track( + id=state.metadata.id, + description=state.metadata.name, + tickets=tickets + ) + # Keep dicts for UI table (or convert Ticket objects back to dicts if needed) + self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets] + # Load track-scoped history + history = project_manager.load_track_history(track_id, self.ui_files_base_dir) + with self._disc_entries_lock: + if history: + self.disc_entries = parse_history_entries(history, self.disc_roles) + else: + self.disc_entries = [] + self._recalculate_session_usage() + self.ai_status = f"Loaded track: {state.metadata.name}" + except Exception as e: + self.ai_status = f"Load track error: {e}" + print(f"Error loading track {track_id}: {e}") + + def _save_active_project(self) -> None: + if self.active_project_path: + try: + project_manager.save_project(self.project, self.active_project_path) + except Exception as e: + self.ai_status = f"save error: {e}" + + def _get_discussion_names(self) -> list[str]: + disc_sec = self.project.get("discussion", {}) + discussions = disc_sec.get("discussions", {}) + return sorted(discussions.keys()) + + def _switch_discussion(self, name: str) -> None: + self._flush_disc_entries_to_project() + disc_sec = self.project.get("discussion", {}) + discussions = disc_sec.get("discussions", {}) + if name not in discussions: + self.ai_status = f"discussion not found: {name}" + return + self.active_discussion = name + self._track_discussion_active = False + disc_sec["active"] = name + disc_data = discussions[name] + with self._disc_entries_lock: + self.disc_entries = parse_history_entries(disc_data.get("history", []), self.disc_roles) + self.ai_status = f"discussion: {name}" + + def _flush_disc_entries_to_project(self) -> None: + history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries] + if self.active_track and self._track_discussion_active: + project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir) + return + disc_sec = self.project.setdefault("discussion", {}) + discussions = disc_sec.setdefault("discussions", {}) + disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion()) + disc_data["history"] = history_strings + disc_data["last_updated"] = project_manager.now_ts() + + def _create_discussion(self, name: str) -> None: + disc_sec = self.project.setdefault("discussion", {}) + discussions = disc_sec.setdefault("discussions", {}) + if name in discussions: + self.ai_status = f"discussion '{name}' already exists" + return + discussions[name] = project_manager.default_discussion() + self._switch_discussion(name) + + def _rename_discussion(self, old_name: str, new_name: str) -> None: + disc_sec = self.project.get("discussion", {}) + discussions = disc_sec.get("discussions", {}) + if old_name not in discussions: + return + if new_name in discussions: + self.ai_status = f"discussion '{new_name}' already exists" + return + discussions[new_name] = discussions.pop(old_name) + if self.active_discussion == old_name: + self.active_discussion = new_name + disc_sec["active"] = new_name + + def _delete_discussion(self, name: str) -> None: + disc_sec = self.project.get("discussion", {}) + discussions = disc_sec.get("discussions", {}) + if len(discussions) <= 1: + self.ai_status = "cannot delete the last discussion" + return + if name not in discussions: + return + del discussions[name] + if self.active_discussion == name: + remaining = sorted(discussions.keys()) + self._switch_discussion(remaining[0]) + + def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None: + if self._pending_mma_approval: + dlg = self._pending_mma_approval.get("dialog_container", [None])[0] + if dlg: + with dlg._condition: + dlg._approved = approved + if payload is not None: + dlg._payload = payload + dlg._done = True + dlg._condition.notify_all() + self._pending_mma_approval = None + if self._pending_mma_spawn: + spawn_dlg = self._pending_mma_spawn.get("dialog_container", [None])[0] + if spawn_dlg: + with spawn_dlg._condition: + spawn_dlg._approved = approved + spawn_dlg._abort = abort + if prompt is not None: + spawn_dlg._prompt = prompt + if context_md is not None: + spawn_dlg._context_md = context_md + spawn_dlg._done = True + spawn_dlg._condition.notify_all() + self._pending_mma_spawn = None + + def _handle_approve_ask(self) -> None: + """Responds with approval for a pending /api/ask request.""" + if not self._ask_request_id: return + request_id = self._ask_request_id + + def do_post() -> None: + try: + requests.post( + "http://127.0.0.1:8999/api/ask/respond", + json={"request_id": request_id, "response": {"approved": True}}, + timeout=2 + ) + except Exception as e: print(f"Error responding to ask: {e}") + threading.Thread(target=do_post, daemon=True).start() + self._pending_ask_dialog = False + self._ask_request_id = None + self._ask_tool_data = None + + def _handle_reject_ask(self) -> None: + """Responds with rejection for a pending /api/ask request.""" + if not self._ask_request_id: return + request_id = self._ask_request_id + + def do_post() -> None: + try: + requests.post( + "http://127.0.0.1:8999/api/ask/respond", + json={"request_id": request_id, "response": {"approved": False}}, + timeout=2 + ) + except Exception as e: print(f"Error responding to ask: {e}") + threading.Thread(target=do_post, daemon=True).start() + self._pending_ask_dialog = False + self._ask_request_id = None + self._ask_tool_data = None + + def _handle_reset_session(self) -> None: + """Logic for resetting the AI session.""" + ai_client.reset_session() + ai_client.clear_comms_log() + self._tool_log.clear() + self._comms_log.clear() + self.disc_entries.clear() + # Clear history in project dict too + disc_sec = self.project.get("discussion", {}) + discussions = disc_sec.get("discussions", {}) + if self.active_discussion in discussions: + discussions[self.active_discussion]["history"] = [] + self.ai_status = "session reset" + self.ai_response = "" + self.ui_ai_input = "" + with self._pending_history_adds_lock: + self._pending_history_adds.clear() + + def _handle_md_only(self) -> None: + """Logic for the 'MD Only' action.""" + try: + md, path, *_ = self._do_generate() + self.last_md = md + self.last_md_path = path + self.ai_status = f"md written: {path.name}" + # Refresh token budget metrics with CURRENT md + self._refresh_api_metrics({}, md_content=md) + except Exception as e: + self.ai_status = f"error: {e}" + + def _handle_generate_send(self) -> None: + """Logic for the 'Gen + Send' action.""" + try: + md, path, file_items, stable_md, disc_text = self._do_generate() + self._last_stable_md = stable_md + self.last_md = md + self.last_md_path = path + self.last_file_items = file_items + except Exception as e: + self.ai_status = f"generate error: {e}" + return + self.ai_status = "sending..." + user_msg = self.ui_ai_input + base_dir = self.ui_files_base_dir + # Prepare event payload + event_payload = events.UserRequestEvent( + prompt=user_msg, + stable_md=stable_md, + file_items=file_items, + disc_text=disc_text, + base_dir=base_dir + ) + # Push to async queue + asyncio.run_coroutine_threadsafe( + self.event_queue.put("user_request", event_payload), + self._loop + ) + + def _recalculate_session_usage(self) -> None: + usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0} + for entry in ai_client.get_comms_log(): + if entry.get("kind") == "response" and "usage" in entry.get("payload", {}): + u = entry["payload"]["usage"] + for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: + if k in usage: + usage[k] += u.get(k, 0) or 0 + self.session_usage = usage + + def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None: + if "latency" in payload: + self.session_usage["last_latency"] = payload["latency"] + self._recalculate_session_usage() + + if md_content is not None: + stats = ai_client.get_token_stats(md_content) + # Ensure compatibility if keys are named differently + if "total_tokens" in stats and "estimated_prompt_tokens" not in stats: + stats["estimated_prompt_tokens"] = stats["total_tokens"] + self._token_stats = stats + + cache_stats = payload.get("cache_stats") + if cache_stats: + count = cache_stats.get("cache_count", 0) + size_bytes = cache_stats.get("total_size_bytes", 0) + self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)" + + def _flush_to_project(self) -> None: + proj = self.project + proj.setdefault("output", {})["output_dir"] = self.ui_output_dir + proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir + proj["files"]["paths"] = self.files + proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir + proj["screenshots"]["paths"] = self.screenshots + proj.setdefault("project", {}) + proj["project"]["git_dir"] = self.ui_project_git_dir + proj["project"]["system_prompt"] = self.ui_project_system_prompt + proj["project"]["main_context"] = self.ui_project_main_context + proj["project"]["word_wrap"] = self.ui_word_wrap + proj["project"]["summary_only"] = self.ui_summary_only + proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms + proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls + proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path + proj.setdefault("agent", {}).setdefault("tools", {}) + for t_name in AGENT_TOOL_NAMES: + proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True) + self._flush_disc_entries_to_project() + disc_sec = proj.setdefault("discussion", {}) + disc_sec["roles"] = self.disc_roles + disc_sec["active"] = self.active_discussion + disc_sec["auto_add"] = self.ui_auto_add_history + # Save MMA State + mma_sec = proj.setdefault("mma", {}) + mma_sec["epic"] = self.ui_epic_input + if self.active_track: + mma_sec["active_track"] = asdict(self.active_track) + else: + mma_sec["active_track"] = None + + def _flush_to_config(self) -> None: + self.config["ai"] = { + "provider": self.current_provider, + "model": self.current_model, + "temperature": self.temperature, + "max_tokens": self.max_tokens, + "history_trunc_limit": self.history_trunc_limit, + } + self.config["ai"]["system_prompt"] = self.ui_global_system_prompt + self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path} + self.config["gui"] = {"show_windows": self.show_windows} + theme.save_to_config(self.config) + + def _do_generate(self) -> tuple[str, Path, list[dict[str, Any]], str, str]: + """Returns (full_md, output_path, file_items, stable_md, discussion_text).""" + self._flush_to_project() + self._save_active_project() + self._flush_to_config() + save_config(self.config) + track_id = self.active_track.id if self.active_track else None + flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id) + full_md, path, file_items = aggregate.run(flat) + # Build stable markdown (no history) for Gemini caching + screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", ".")) + screenshots = flat.get("screenshots", {}).get("paths", []) + summary_only = flat.get("project", {}).get("summary_only", False) + stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only) + # Build discussion history text separately + history = flat.get("discussion", {}).get("history", []) + discussion_text = aggregate.build_discussion_text(history) + return full_md, path, file_items, stable_md, discussion_text + + def _cb_plan_epic(self) -> None: + def _bg_task() -> None: + try: + self.ai_status = "Planning Epic (Tier 1)..." + history = orchestrator_pm.get_track_history_summary() + proj = project_manager.load_project(self.active_project_path) + flat = project_manager.flat_config(proj) + file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", [])) + _t1_baseline = len(ai_client.get_comms_log()) + tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history) + _t1_new = ai_client.get_comms_log()[_t1_baseline:] + _t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"] + _t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp) + _t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp) + def _push_t1_usage(i: int, o: int) -> None: + self.mma_tier_usage["Tier 1"]["input"] += i + self.mma_tier_usage["Tier 1"]["output"] += o + with self._pending_gui_tasks_lock: + self._pending_gui_tasks.append({ + "action": "custom_callback", + "callback": _push_t1_usage, + "args": [_t1_in, _t1_out] + }) + self._pending_gui_tasks.append({ + "action": "handle_ai_response", + "payload": { + "text": json.dumps(tracks, indent=2), + "stream_id": "Tier 1", + "status": "Epic tracks generated." + } + }) + self._pending_gui_tasks.append({ + "action": "show_track_proposal", + "payload": tracks + }) + except Exception as e: + self.ai_status = f"Epic plan error: {e}" + print(f"ERROR in _cb_plan_epic background task: {e}") + threading.Thread(target=_bg_task, daemon=True).start() + + def _cb_accept_tracks(self) -> None: + self._show_track_proposal_modal = False + def _bg_task() -> None: + # Generate skeletons once + self.ai_status = "Phase 2: Generating skeletons for all tracks..." + parser = ASTParser(language="python") + generated_skeletons = "" + try: + for i, file_path in enumerate(self.files): + try: + self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..." + abs_path = Path(self.ui_files_base_dir) / file_path + if abs_path.exists() and abs_path.suffix == ".py": + with open(abs_path, "r", encoding="utf-8") as f: + code = f.read() + generated_skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n" + except Exception as e: + print(f"Error parsing skeleton for {file_path}: {e}") + except Exception as e: + self.ai_status = f"Error generating skeletons: {e}" + print(f"Error generating skeletons: {e}") + return # Exit if skeleton generation fails + + # Now loop through tracks and call _start_track_logic with generated skeletons + total_tracks = len(self.proposed_tracks) + for i, track_data in enumerate(self.proposed_tracks): + title = track_data.get("title") or track_data.get("goal", "Untitled Track") + self.ai_status = f"Processing track {i+1} of {total_tracks}: '{title}'..." + self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons + + with self._pending_gui_tasks_lock: + self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started + self.ai_status = f"All {total_tracks} tracks accepted and execution started." + threading.Thread(target=_bg_task, daemon=True).start() + + def _cb_start_track(self, user_data: Any = None) -> None: + if isinstance(user_data, str): + # If track_id is provided directly + track_id = user_data + # Ensure it's loaded as active + if not self.active_track or self.active_track.id != track_id: + self._cb_load_track(track_id) + + if self.active_track: + # Use the active track object directly to start execution + self.mma_status = "running" + engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode) + flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id) + full_md, _, _ = aggregate.run(flat) + asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop) + self.ai_status = f"Track '{self.active_track.description}' started." + return + + idx = 0 + if isinstance(user_data, int): + idx = user_data + elif isinstance(user_data, dict): + idx = user_data.get("index", 0) + if 0 <= idx < len(self.proposed_tracks): + track_data = self.proposed_tracks[idx] + title = track_data.get("title") or track_data.get("goal", "Untitled Track") + threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start() + self.ai_status = f"Track '{title}' started." + + def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None: + try: + goal = track_data.get("goal", "") + title = track_data.get("title") or track_data.get("goal", "Untitled Track") + self.ai_status = f"Phase 2: Generating tickets for {title}..." + + skeletons = "" # Initialize skeletons variable + if skeletons_str is None: # Only generate if not provided + # 1. Get skeletons for context + parser = ASTParser(language="python") + for i, file_path in enumerate(self.files): + try: + self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..." + abs_path = Path(self.ui_files_base_dir) / file_path + if abs_path.exists() and abs_path.suffix == ".py": + with open(abs_path, "r", encoding="utf-8") as f: + code = f.read() + skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n" + except Exception as e: + print(f"Error parsing skeleton for {file_path}: {e}") + else: + skeletons = skeletons_str # Use provided skeletons + + self.ai_status = "Phase 2: Calling Tech Lead..." + _t2_baseline = len(ai_client.get_comms_log()) + raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons) + _t2_new = ai_client.get_comms_log()[_t2_baseline:] + _t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"] + _t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp) + _t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp) + self.mma_tier_usage["Tier 2"]["input"] += _t2_in + self.mma_tier_usage["Tier 2"]["output"] += _t2_out + if not raw_tickets: + self.ai_status = f"Error: No tickets generated for track: {title}" + print(f"Warning: No tickets generated for track: {title}") + return + self.ai_status = "Phase 2: Sorting tickets..." + try: + sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets) + except ValueError as e: + print(f"Dependency error in track '{title}': {e}") + sorted_tickets_data = raw_tickets + # 3. Create Track and Ticket objects + tickets = [] + for t_data in sorted_tickets_data: + ticket = Ticket( + id=t_data["id"], + description=t_data.get("description") or t_data.get("goal", "No description"), + status=t_data.get("status", "todo"), + assigned_to=t_data.get("assigned_to", "unassigned"), + depends_on=t_data.get("depends_on", []), + step_mode=t_data.get("step_mode", False) + ) + tickets.append(ticket) + track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}" + track = Track(id=track_id, description=title, tickets=tickets) + # Initialize track state in the filesystem + from src.models import TrackState, Metadata + meta = Metadata(id=track_id, name=title, status="todo", created_at=datetime.now(), updated_at=datetime.now()) + state = TrackState(metadata=meta, discussion=[], tasks=tickets) + project_manager.save_track_state(track_id, state, self.ui_files_base_dir) + # 4. Initialize ConductorEngine and run loop + engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode) + # Use current full markdown context for the track execution + track_id_param = track.id + flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param) + full_md, _, _ = aggregate.run(flat) + # Schedule the coroutine on the internal event loop + asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop) + except Exception as e: + self.ai_status = f"Track start error: {e}" + print(f"ERROR in _start_track_logic: {e}") + + def _cb_ticket_retry(self, ticket_id: str) -> None: + for t in self.active_tickets: + if t.get('id') == ticket_id: + t['status'] = 'todo' + break + asyncio.run_coroutine_threadsafe( + self.event_queue.put("mma_retry", {"ticket_id": ticket_id}), + self._loop + ) + + def _cb_ticket_skip(self, ticket_id: str) -> None: + for t in self.active_tickets: + if t.get('id') == ticket_id: + t['status'] = 'skipped' + break + asyncio.run_coroutine_threadsafe( + self.event_queue.put("mma_skip", {"ticket_id": ticket_id}), + self._loop + ) + + def _cb_run_conductor_setup(self) -> None: + base = Path("conductor") + if not base.exists(): + self.ui_conductor_setup_summary = "Error: conductor/ directory not found." + return + files = list(base.glob("**/*")) + files = [f for f in files if f.is_file()] + summary = [f"Conductor Directory: {base.absolute()}"] + summary.append(f"Total Files: {len(files)}") + total_lines = 0 + for f in files: + try: + with open(f, "r", encoding="utf-8") as fd: + lines = len(fd.readlines()) + total_lines += lines + summary.append(f"- {f.relative_to(base)}: {lines} lines") + except Exception: + summary.append(f"- {f.relative_to(base)}: Error reading") + summary.append(f"Total Line Count: {total_lines}") + tracks_dir = base / "tracks" + if tracks_dir.exists(): + tracks = [d for d in tracks_dir.iterdir() if d.is_dir()] + summary.append(f"Total Tracks Found: {len(tracks)}") + else: + summary.append("Tracks Directory: Not found") + self.ui_conductor_setup_summary = "\n".join(summary) + + def _cb_create_track(self, name: str, desc: str, track_type: str) -> None: + if not name: return + date_suffix = datetime.now().strftime("%Y%m%d") + track_id = f"{name.lower().replace(' ', '_')}_{date_suffix}" + track_dir = Path("conductor/tracks") / track_id + track_dir.mkdir(parents=True, exist_ok=True) + spec_file = track_dir / "spec.md" + with open(spec_file, "w", encoding="utf-8") as f: + f.write(f"# Specification: {name}\n\nType: {track_type}\n\nDescription: {desc}\n") + plan_file = track_dir / "plan.md" + with open(plan_file, "w", encoding="utf-8") as f: + f.write(f"# Implementation Plan: {name}\n\n- [ ] Task 1: Initialize\n") + meta_file = track_dir / "metadata.json" + with open(meta_file, "w", encoding="utf-8") as f: + json.dump({ + "id": track_id, + "title": name, + "description": desc, + "type": track_type, + "status": "new", + "progress": 0.0 + }, f, indent=1) + # Refresh tracks from disk + self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) + + def _push_mma_state_update(self) -> None: + if not self.active_track: + return + # Sync active_tickets (list of dicts) back to active_track.tickets (list of Ticket objects) + self.active_track.tickets = [Ticket.from_dict(t) for t in self.active_tickets] + # Save the state to disk + from src.project_manager import save_track_state, load_track_state + from src.models import TrackState, Metadata + + existing = load_track_state(self.active_track.id, self.ui_files_base_dir) + meta = Metadata( + id=self.active_track.id, + name=self.active_track.description, + status=self.mma_status, + created_at=existing.metadata.created_at if existing else datetime.now(), + updated_at=datetime.now() + ) + state = TrackState( + metadata=meta, + discussion=existing.discussion if existing else [], + tasks=self.active_track.tickets + ) + save_track_state(self.active_track.id, state, self.ui_files_base_dir) diff --git a/src/gui_2.py b/src/gui_2.py index 522f33e..b8eb898 100644 --- a/src/gui_2.py +++ b/src/gui_2.py @@ -87,54 +87,6 @@ def truncate_entries(entries: list[dict[str, Any]], max_pairs: int) -> list[dict return entries[i:] return entries -class ConfirmDialog: - def __init__(self, script: str, base_dir: str) -> None: - self._uid = str(uuid.uuid4()) - self._script = str(script) if script is not None else "" - self._base_dir = str(base_dir) if base_dir is not None else "" - self._condition = threading.Condition() - self._done = False - self._approved = False - - def wait(self) -> tuple[bool, str]: - with self._condition: - while not self._done: - self._condition.wait(timeout=0.1) - return self._approved, self._script - -class MMAApprovalDialog: - def __init__(self, ticket_id: str, payload: str) -> None: - self._payload = payload - self._condition = threading.Condition() - self._done = False - self._approved = False - - def wait(self) -> tuple[bool, str]: - with self._condition: - while not self._done: - self._condition.wait(timeout=0.1) - return self._approved, self._payload - -class MMASpawnApprovalDialog: - def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None: - self._prompt = prompt - self._context_md = context_md - self._condition = threading.Condition() - self._done = False - self._approved = False - self._abort = False - - def wait(self) -> dict[str, Any]: - with self._condition: - while not self._done: - self._condition.wait(timeout=0.1) - return { - 'approved': self._approved, - 'abort': self._abort, - 'prompt': self._prompt, - 'context_md': self._context_md - } - class GenerateRequest(BaseModel): prompt: str auto_add_history: bool = True @@ -152,6 +104,7 @@ class App: # Initialize controller and delegate state self.controller = AppController() self.controller.init_state() + self.controller.start_services() # Aliases for controller-owned locks self._send_thread_lock = self.controller._send_thread_lock @@ -162,117 +115,12 @@ class App: self._pending_gui_tasks_lock = self.controller._pending_gui_tasks_lock self._pending_dialog_lock = self.controller._pending_dialog_lock self._api_event_queue_lock = self.controller._api_event_queue_lock + + # UI-specific initialization + self._init_ui_actions() - self._pending_dialog: ConfirmDialog | None = None - self._pending_dialog_open: bool = False - self._pending_actions: dict[str, ConfirmDialog] = {} - self._pending_ask_dialog: bool = False - - self._prune_old_logs() - self._init_ai_and_hooks() - - def __getattr__(self, name: str) -> Any: - if name != 'controller' and hasattr(self, 'controller') and hasattr(self.controller, name): - return getattr(self.controller, name) - raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") - - def __setattr__(self, name: str, value: Any) -> None: - if name == 'controller': - super().__setattr__(name, value) - elif hasattr(self, 'controller') and hasattr(self.controller, name): - setattr(self.controller, name, value) - else: - super().__setattr__(name, value) - - def _prune_old_logs(self) -> None: - """Asynchronously prunes old insignificant logs on startup.""" - - def run_prune() -> None: - try: - registry = LogRegistry("logs/log_registry.toml") - pruner = LogPruner(registry, "logs") - pruner.prune() - except Exception as e: - print(f"Error during log pruning: {e}") - thread = threading.Thread(target=run_prune, daemon=True) - thread.start() - - @property - def current_provider(self) -> str: - return self._current_provider - - @current_provider.setter - def current_provider(self, value: str) -> None: - if value != self._current_provider: - self._current_provider = value - ai_client.reset_session() - ai_client.set_provider(value, self.current_model) - if value == "gemini_cli": - if not ai_client._gemini_cli_adapter: - ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) - else: - ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path - if hasattr(self, 'hook_server'): - self.hook_server.start() - self.available_models = [] - self.available_models = [] - self._fetch_models(value) - self._token_stats = {} - self._token_stats_dirty = True - - @property - def current_model(self) -> str: - return self._current_model - - @current_model.setter - def current_model(self, value: str) -> None: - if value != self._current_model: - self._current_model = value - ai_client.reset_session() - ai_client.set_provider(self.current_provider, value) - self._token_stats = {} - self._token_stats_dirty = True - - def _init_ai_and_hooks(self) -> None: - ai_client.set_provider(self.current_provider, self.current_model) - if self.current_provider == "gemini_cli": - if not ai_client._gemini_cli_adapter: - ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) - else: - ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path - ai_client.confirm_and_run_callback = self._confirm_and_run - ai_client.comms_log_callback = self._on_comms_entry - ai_client.tool_log_callback = self._on_tool_log - mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics - self.perf_monitor.alert_callback = self._on_performance_alert - ai_client.events.on("request_start", self._on_api_event) - ai_client.events.on("response_received", self._on_api_event) - ai_client.events.on("tool_execution", self._on_api_event) - self._settable_fields: dict[str, str] = { - 'ai_input': 'ui_ai_input', - 'project_git_dir': 'ui_project_git_dir', - 'auto_add_history': 'ui_auto_add_history', - 'disc_new_name_input': 'ui_disc_new_name_input', - 'project_main_context': 'ui_project_main_context', - 'gcli_path': 'ui_gemini_cli_path', - 'output_dir': 'ui_output_dir', - 'files_base_dir': 'ui_files_base_dir', - 'ai_status': 'ai_status', - 'ai_response': 'ai_response', - 'active_discussion': 'active_discussion', - 'current_provider': 'current_provider', - 'current_model': 'current_model', - 'token_budget_pct': '_token_budget_pct', - 'token_budget_current': '_token_budget_current', - 'token_budget_label': '_token_budget_label', - 'show_confirm_modal': 'show_confirm_modal', - 'mma_epic_input': 'ui_epic_input', - 'mma_status': 'mma_status', - 'mma_active_tier': 'active_tier', - 'ui_new_track_name': 'ui_new_track_name', - 'ui_new_track_desc': 'ui_new_track_desc', - 'manual_approve': 'ui_manual_approve' - } + def _init_ui_actions(self) -> None: + # Set up UI-specific action maps self._clickable_actions: dict[str, Callable[..., Any]] = { 'btn_reset': self._handle_reset_session, 'btn_gen_send': self._handle_generate_send, @@ -294,450 +142,39 @@ class App: } self._discussion_names_cache: list[str] = [] self._discussion_names_dirty: bool = True - self.hook_server = api_hooks.HookServer(self) - self.hook_server.start() - def create_api(self) -> FastAPI: - """Creates and configures the FastAPI application for headless mode.""" - api = FastAPI(title="Manual Slop Headless API") + def __getattr__(self, name: str) -> Any: + if name != 'controller' and hasattr(self, 'controller') and hasattr(self.controller, name): + return getattr(self.controller, name) + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") - API_KEY_NAME = "X-API-KEY" - api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) - - async def get_api_key(header_key: str = Depends(api_key_header)) -> str: - """Validates the API key from the request header against configuration.""" - headless_cfg = self.config.get("headless", {}) - config_key = headless_cfg.get("api_key", "").strip() - env_key = os.environ.get("SLOP_API_KEY", "").strip() - target_key = env_key or config_key - if not target_key: - raise HTTPException(status_code=403, detail="API Key not configured on server") - if header_key == target_key: - return header_key - raise HTTPException(status_code=403, detail="Could not validate API Key") - - @api.get("/health") - def health() -> dict[str, str]: - """Returns the health status of the API.""" - return {"status": "ok"} - - @api.get("/status", dependencies=[Depends(get_api_key)]) - def status() -> dict[str, Any]: - """Returns the current status of the application.""" - return { - "provider": self.current_provider, - "model": self.current_model, - "status": self.ai_status, - "usage": self.session_usage - } - - @api.post("/api/v1/generate", dependencies=[Depends(get_api_key)]) - def generate(req: GenerateRequest) -> dict[str, Any]: - """Triggers an AI generation request using the current project context.""" - if not req.prompt.strip(): - raise HTTPException(status_code=400, detail="Prompt cannot be empty") - with self._send_thread_lock: - start_time = time.time() - try: - md, path, file_items, stable_md, disc_text = self._do_generate() - self._last_stable_md = stable_md - self.last_md = md - self.last_md_path = path - self.last_file_items = file_items - except Exception as e: - raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") - user_msg = req.prompt - base_dir = self.ui_files_base_dir - csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) - ai_client.set_custom_system_prompt("\n\n".join(csp)) - temp = req.temperature if req.temperature is not None else self.temperature - tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens - ai_client.set_model_params(temp, tokens, self.history_trunc_limit) - ai_client.set_agent_tools(self.ui_agent_tools) - if req.auto_add_history: - with self._pending_history_adds_lock: - self._pending_history_adds.append({ - "role": "User", - "content": user_msg, - "collapsed": False, - "ts": project_manager.now_ts() - }) - try: - resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text) - if req.auto_add_history: - with self._pending_history_adds_lock: - self._pending_history_adds.append({ - "role": "AI", - "content": resp, - "collapsed": False, - "ts": project_manager.now_ts() - }) - self._recalculate_session_usage() - duration = time.time() - start_time - return { - "text": resp, - "metadata": { - "provider": self.current_provider, - "model": self.current_model, - "duration_sec": round(duration, 3), - "timestamp": project_manager.now_ts() - }, - "usage": self.session_usage - } - except ProviderError as e: - raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}") - except Exception as e: - raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}") - - @api.post("/api/v1/stream", dependencies=[Depends(get_api_key)]) - async def stream(req: GenerateRequest) -> Any: - """Placeholder for streaming AI generation responses (Not yet implemented).""" - raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.") - - @api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)]) - def pending_actions() -> list[dict[str, Any]]: - """Lists all pending PowerShell scripts awaiting confirmation.""" - with self._pending_dialog_lock: - return [ - {"action_id": uid, "script": diag._script, "base_dir": diag._base_dir} - for uid, diag in self._pending_actions.items() - ] - - @api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)]) - def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, str]: - """Approves or rejects a pending action.""" - with self._pending_dialog_lock: - if action_id not in self._pending_actions: - raise HTTPException(status_code=404, detail="Action not found") - dialog = self._pending_actions.pop(action_id) - if req.script is not None: - dialog._script = req.script - with dialog._condition: - dialog._approved = req.approved - dialog._done = True - dialog._condition.notify_all() - return {"status": "confirmed" if req.approved else "rejected"} - - @api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)]) - def list_sessions() -> list[str]: - """Lists all session log files.""" - log_dir = Path("logs") - if not log_dir.exists(): - return [] - return [f.name for f in log_dir.glob("*.log")] - - @api.get("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) - def get_session(session_id: str) -> dict[str, Any]: - """Returns the content of a specific session log.""" - log_path = Path("logs") / session_id - if not log_path.exists(): - raise HTTPException(status_code=404, detail="Session log not found") - return {"id": session_id, "content": log_path.read_text(encoding="utf-8", errors="replace")} - - @api.delete("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) - def delete_session(session_id: str) -> dict[str, str]: - """Deletes a specific session log.""" - log_path = Path("logs") / session_id - if not log_path.exists(): - raise HTTPException(status_code=404, detail="Session log not found") - log_path.unlink() - return {"status": "deleted"} - - @api.get("/api/v1/context", dependencies=[Depends(get_api_key)]) - def get_context() -> dict[str, Any]: - """Returns the current aggregated project context.""" - try: - md, path, file_items, stable_md, disc_text = self._do_generate() - # Pull current screenshots if available in project - screenshots = self.project.get("screenshots", {}).get("paths", []) - return { - "files": [f.get("path") if isinstance(f, dict) else str(f) for f in file_items], - "screenshots": screenshots, - "files_base_dir": self.ui_files_base_dir, - "markdown": md, - "discussion": disc_text - } - except Exception as e: - raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") - - @api.get("/api/v1/token_stats", dependencies=[Depends(get_api_key)]) - def token_stats() -> dict[str, Any]: - """Returns current token usage and budget statistics.""" - return self._token_stats - - return api - # ---------------------------------------------------------------- project loading - - def _cb_new_project_automated(self, user_data: Any) -> None: - if user_data: - name = Path(user_data).stem - proj = project_manager.default_project(name) - project_manager.save_project(proj, user_data) - if user_data not in self.project_paths: - self.project_paths.append(user_data) - self._switch_project(user_data) - - def _cb_project_save(self) -> None: - self._flush_to_project() - self._save_active_project() - self._flush_to_config() - save_config(self.config) - self.ai_status = "config saved" - - def _cb_disc_create(self) -> None: - nm = self.ui_disc_new_name_input.strip() - if nm: - self._create_discussion(nm) - self.ui_disc_new_name_input = "" - - def _switch_project(self, path: str) -> None: - if not Path(path).exists(): - self.ai_status = f"project file not found: {path}" - return - self._flush_to_project() - self._save_active_project() - try: - self.project = project_manager.load_project(path) - self.active_project_path = path - except Exception as e: - self.ai_status = f"failed to load project: {e}" - return - self._refresh_from_project() - self._discussion_names_dirty = True - ai_client.reset_session() - self.ai_status = f"switched to: {Path(path).stem}" - - def _refresh_from_project(self) -> None: - self.files = list(self.project.get("files", {}).get("paths", [])) - self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) - disc_sec = self.project.get("discussion", {}) - self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES))) - self.active_discussion = disc_sec.get("active", "main") - disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) - with self._disc_entries_lock: - self.disc_entries = parse_history_entries(disc_data.get("history", []), self.disc_roles) - proj = self.project - self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen") - self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".") - self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".") - self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "") - self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "") - self.ui_project_main_context = proj.get("project", {}).get("main_context", "") - self.ui_gemini_cli_path = proj.get("gemini_cli", {}).get("binary_path", "gemini") - self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False) - self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True) - self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True) - self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True) - self.ui_summary_only = proj.get("project", {}).get("summary_only", False) - agent_tools_cfg = proj.get("agent", {}).get("tools", {}) - self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES} - # MMA Tracks - self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) - # Restore MMA state - mma_sec = proj.get("mma", {}) - self.ui_epic_input = mma_sec.get("epic", "") - at_data = mma_sec.get("active_track") - if at_data: - try: - tickets = [] - for t_data in at_data.get("tickets", []): - tickets.append(Ticket(**t_data)) - self.active_track = Track( - id=at_data.get("id"), - description=at_data.get("description"), - tickets=tickets - ) - self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table - except Exception as e: - print(f"Failed to deserialize active track: {e}") - self.active_track = None + def __setattr__(self, name: str, value: Any) -> None: + if name == 'controller': + super().__setattr__(name, value) + elif hasattr(self, 'controller') and hasattr(self.controller, name): + setattr(self.controller, name, value) else: - self.active_track = None - self.active_tickets = [] - # Load track-scoped history if track is active - if self.active_track: - track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir) - if track_history: - with self._disc_entries_lock: - self.disc_entries = parse_history_entries(track_history, self.disc_roles) + super().__setattr__(name, value) - def _cb_load_track(self, track_id: str) -> None: - state = project_manager.load_track_state(track_id, self.ui_files_base_dir) - if state: - try: - # Convert list[Ticket] or list[dict] to list[Ticket] for Track object - tickets = [] - for t in state.tasks: - if isinstance(t, dict): - tickets.append(Ticket(**t)) - else: - tickets.append(t) - self.active_track = Track( - id=state.metadata.id, - description=state.metadata.name, - tickets=tickets - ) - # Keep dicts for UI table (or convert Ticket objects back to dicts if needed) - from dataclasses import asdict - self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets] - # Load track-scoped history - history = project_manager.load_track_history(track_id, self.ui_files_base_dir) - with self._disc_entries_lock: - if history: - self.disc_entries = parse_history_entries(history, self.disc_roles) - else: - self.disc_entries = [] - self._recalculate_session_usage() - self.ai_status = f"Loaded track: {state.metadata.name}" - except Exception as e: - self.ai_status = f"Load track error: {e}" - print(f"Error loading track {track_id}: {e}") + @property + def current_provider(self) -> str: + return self.controller.current_provider - def _save_active_project(self) -> None: - if self.active_project_path: - try: - project_manager.save_project(self.project, self.active_project_path) - except Exception as e: - self.ai_status = f"save error: {e}" - # ---------------------------------------------------------------- discussion management + @current_provider.setter + def current_provider(self, value: str) -> None: + self.controller.current_provider = value - def _get_discussion_names(self) -> list[str]: - if self._discussion_names_dirty: - disc_sec = self.project.get("discussion", {}) - discussions = disc_sec.get("discussions", {}) - self._discussion_names_cache = sorted(discussions.keys()) - self._discussion_names_dirty = False - return self._discussion_names_cache + @property + def current_model(self) -> str: + return self.controller.current_model - def _switch_discussion(self, name: str) -> None: - self._flush_disc_entries_to_project() - disc_sec = self.project.get("discussion", {}) - discussions = disc_sec.get("discussions", {}) - if name not in discussions: - self.ai_status = f"discussion not found: {name}" - return - self.active_discussion = name - self.active_discussion_idx = -1 - discussions_root = self.project.get("discussions", []) - for i, d in enumerate(discussions_root): - if isinstance(d, dict) and d.get("title") == name: - self.active_discussion_idx = i - break - self._track_discussion_active = False - disc_sec["active"] = name - self._discussion_names_dirty = True - disc_data = discussions[name] - with self._disc_entries_lock: - self.disc_entries = parse_history_entries(disc_data.get("history", []), self.disc_roles) - self.ai_status = f"discussion: {name}" + @current_model.setter + def current_model(self, value: str) -> None: + self.controller.current_model = value - def _flush_disc_entries_to_project(self) -> None: - history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries] - if self.active_track and self._track_discussion_active: - project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir) - return - disc_sec = self.project.setdefault("discussion", {}) - discussions = disc_sec.setdefault("discussions", {}) - disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion()) - disc_data["history"] = history_strings - disc_data["last_updated"] = project_manager.now_ts() + # ---------------------------------------------------------------- project loading - def _create_discussion(self, name: str) -> None: - disc_sec = self.project.setdefault("discussion", {}) - discussions = disc_sec.setdefault("discussions", {}) - if name in discussions: - self.ai_status = f"discussion '{name}' already exists" - return - discussions[name] = project_manager.default_discussion() - self._discussion_names_dirty = True - self._switch_discussion(name) - - def _rename_discussion(self, old_name: str, new_name: str) -> None: - disc_sec = self.project.get("discussion", {}) - discussions = disc_sec.get("discussions", {}) - if old_name not in discussions: - return - if new_name in discussions: - self.ai_status = f"discussion '{new_name}' already exists" - return - discussions[new_name] = discussions.pop(old_name) - self._discussion_names_dirty = True - if self.active_discussion == old_name: - self.active_discussion = new_name - disc_sec["active"] = new_name - - def _delete_discussion(self, name: str) -> None: - disc_sec = self.project.get("discussion", {}) - discussions = disc_sec.get("discussions", {}) - if len(discussions) <= 1: - self.ai_status = "cannot delete the last discussion" - return - if name not in discussions: - return - del discussions[name] - self._discussion_names_dirty = True - if self.active_discussion == name: - remaining = sorted(discussions.keys()) - self._switch_discussion(remaining[0]) - # ---------------------------------------------------------------- logic - - def _on_comms_entry(self, entry: dict[str, Any]) -> None: - # sys.stderr.write(f"[DEBUG] _on_comms_entry: {entry.get('kind')} {entry.get('direction')}\n") - session_logger.log_comms(entry) - entry["local_ts"] = time.time() - kind = entry.get("kind") - payload = entry.get("payload", {}) - if kind in ("tool_result", "tool_call"): - role = "Tool" if kind == "tool_result" else "Vendor API" - content = "" - if kind == "tool_result": - content = payload.get("output", "") - else: - content = payload.get("script") or payload.get("args") or payload.get("message", "") - if isinstance(content, dict): - content = json.dumps(content, indent=1) - with self._pending_history_adds_lock: - self._pending_history_adds.append({ - "role": role, - "content": f"[{kind.upper().replace('_', ' ')}]\n{content}", - "collapsed": True, - "ts": entry.get("ts", project_manager.now_ts()) - }) - # If this is a history_add kind, route it to history queue instead - if kind == "history_add": - payload = entry.get("payload", {}) - with self._pending_history_adds_lock: - self._pending_history_adds.append({ - "role": payload.get("role", "AI"), - "content": payload.get("content", ""), - "collapsed": payload.get("collapsed", False), - "ts": entry.get("ts", project_manager.now_ts()) - }) - return - with self._pending_comms_lock: - self._pending_comms.append(entry) - - def _on_tool_log(self, script: str, result: str) -> None: - session_logger.log_tool_call(script, result, None) - source_tier = ai_client.current_tier - with self._pending_tool_calls_lock: - self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) - - def _on_api_event(self, *args: Any, **kwargs: Any) -> None: - payload = kwargs.get("payload", {}) - with self._pending_gui_tasks_lock: - self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload}) - - def _on_performance_alert(self, message: str) -> None: - """Called by PerformanceMonitor when a threshold is exceeded.""" - alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load." - # Inject into history as a 'System' message - with self._pending_history_adds_lock: - self._pending_history_adds.append({ - "role": "System", - "content": alert_text, - "ts": project_manager.now_ts() - }) + # ---------------------------------------------------------------- logic def _process_pending_gui_tasks(self) -> None: if not self._pending_gui_tasks: @@ -910,210 +347,9 @@ class App: with self._disc_entries_lock: self.disc_entries.append(item) - def _handle_approve_script(self) -> None: - """Logic for approving a pending script via API hooks.""" - print("[DEBUG] _handle_approve_script called") - with self._pending_dialog_lock: - if self._pending_dialog: - print(f"[DEBUG] Approving dialog for: {self._pending_dialog._script[:50]}...") - with self._pending_dialog._condition: - self._pending_dialog._approved = True - self._pending_dialog._done = True - self._pending_dialog._condition.notify_all() - self._pending_dialog = None - else: - print("[DEBUG] No pending dialog to approve") - - def _handle_reject_script(self) -> None: - """Logic for rejecting a pending script via API hooks.""" - print("[DEBUG] _handle_reject_script called") - with self._pending_dialog_lock: - if self._pending_dialog: - print(f"[DEBUG] Rejecting dialog for: {self._pending_dialog._script[:50]}...") - with self._pending_dialog._condition: - self._pending_dialog._approved = False - self._pending_dialog._done = True - self._pending_dialog._condition.notify_all() - self._pending_dialog = None - else: - print("[DEBUG] No pending dialog to reject") - - def _handle_approve_tool(self) -> None: - """Logic for approving a pending tool execution via API hooks.""" - print("[DEBUG] _handle_approve_tool called") - if self._pending_ask_dialog: - self._handle_approve_ask() - else: - print("[DEBUG] No pending tool approval found") - - def _handle_approve_mma_step(self) -> None: - """Logic for approving a pending MMA step execution via API hooks.""" - print("[DEBUG] _handle_approve_mma_step called") - if self._pending_mma_approval: - self._handle_mma_respond(approved=True, payload=self._mma_approval_payload) - self._mma_approval_open = False - self._pending_mma_approval = None - else: - print("[DEBUG] No pending MMA step approval found") - - def _handle_approve_spawn(self) -> None: - """Logic for approving a pending sub-agent spawn via API hooks.""" - print("[DEBUG] _handle_approve_spawn called") - if self._pending_mma_spawn: - # Synchronize with the handler logic - self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context) - # Crucially, close the modal state so UI can continue - self._mma_spawn_open = False - self._pending_mma_spawn = None - else: - print("[DEBUG] No pending spawn approval found") - - def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None: - if self._pending_mma_approval: - dlg = self._pending_mma_approval.get("dialog_container", [None])[0] - if dlg: - with dlg._condition: - dlg._approved = approved - if payload is not None: - dlg._payload = payload - dlg._done = True - dlg._condition.notify_all() - self._pending_mma_approval = None - if self._pending_mma_spawn: - spawn_dlg = self._pending_mma_spawn.get("dialog_container", [None])[0] - if spawn_dlg: - with spawn_dlg._condition: - spawn_dlg._approved = approved - spawn_dlg._abort = abort - if prompt is not None: - spawn_dlg._prompt = prompt - if context_md is not None: - spawn_dlg._context_md = context_md - spawn_dlg._done = True - spawn_dlg._condition.notify_all() - self._pending_mma_spawn = None - - def _handle_approve_ask(self) -> None: - """Responds with approval for a pending /api/ask request.""" - if not self._ask_request_id: return - request_id = self._ask_request_id - - def do_post() -> None: - try: - requests.post( - "http://127.0.0.1:8999/api/ask/respond", - json={"request_id": request_id, "response": {"approved": True}}, - timeout=2 - ) - except Exception as e: print(f"Error responding to ask: {e}") - threading.Thread(target=do_post, daemon=True).start() - self._pending_ask_dialog = False - self._ask_request_id = None - self._ask_tool_data = None - - def _handle_reject_ask(self) -> None: - """Responds with rejection for a pending /api/ask request.""" - if not self._ask_request_id: return - request_id = self._ask_request_id - - def do_post() -> None: - try: - requests.post( - "http://127.0.0.1:8999/api/ask/respond", - json={"request_id": request_id, "response": {"approved": False}}, - timeout=2 - ) - except Exception as e: print(f"Error responding to ask: {e}") - threading.Thread(target=do_post, daemon=True).start() - self._pending_ask_dialog = False - self._ask_request_id = None - self._ask_tool_data = None - - def _handle_reset_session(self) -> None: - """Logic for resetting the AI session.""" - ai_client.reset_session() - ai_client.clear_comms_log() - self._tool_log.clear() - self._comms_log.clear() - self.disc_entries.clear() - # Clear history in project dict too - disc_sec = self.project.get("discussion", {}) - discussions = disc_sec.get("discussions", {}) - if self.active_discussion in discussions: - discussions[self.active_discussion]["history"] = [] - self.ai_status = "session reset" - self.ai_response = "" - self.ui_ai_input = "" - with self._pending_history_adds_lock: - self._pending_history_adds.clear() - - def _handle_md_only(self) -> None: - """Logic for the 'MD Only' action.""" - try: - md, path, *_ = self._do_generate() - self.last_md = md - self.last_md_path = path - self.ai_status = f"md written: {path.name}" - # Refresh token budget metrics with CURRENT md - self._refresh_api_metrics({}, md_content=md) - except Exception as e: - self.ai_status = f"error: {e}" - - def _handle_generate_send(self) -> None: - """Logic for the 'Gen + Send' action.""" - try: - md, path, file_items, stable_md, disc_text = self._do_generate() - self._last_stable_md = stable_md - self.last_md = md - self.last_md_path = path - self.last_file_items = file_items - except Exception as e: - self.ai_status = f"generate error: {e}" - return - self.ai_status = "sending..." - user_msg = self.ui_ai_input - base_dir = self.ui_files_base_dir - # Prepare event payload - event_payload = events.UserRequestEvent( - prompt=user_msg, - stable_md=stable_md, - file_items=file_items, - disc_text=disc_text, - base_dir=base_dir - ) - # Push to async queue - asyncio.run_coroutine_threadsafe( - self.event_queue.put("user_request", event_payload), - self._loop - ) - - def _run_event_loop(self) -> None: - """Runs the internal asyncio event loop.""" - asyncio.set_event_loop(self._loop) - self._loop.create_task(self._process_event_queue()) - - # Fallback: process queues even if GUI thread is idling/stuck - async def queue_fallback() -> None: - while True: - try: - self._process_pending_gui_tasks() - self._process_pending_history_adds() - except: pass - await asyncio.sleep(0.1) - - self._loop.create_task(queue_fallback()) - self._loop.run_forever() - def shutdown(self) -> None: """Cleanly shuts down the app's background tasks and saves state.""" - if hasattr(self, 'hook_server'): - self.hook_server.stop() - if hasattr(self, 'perf_monitor'): - self.perf_monitor.stop() - if self._loop.is_running(): - self._loop.call_soon_threadsafe(self._loop.stop) - if self._loop_thread.is_alive(): - self._loop_thread.join(timeout=2.0) + self.controller.stop_services() # Join other threads if they exist if self.send_thread and self.send_thread.is_alive(): self.send_thread.join(timeout=1.0) @@ -1129,78 +365,6 @@ class App: save_config(self.config) except: pass - async def _process_event_queue(self) -> None: - """Listens for and processes events from the AsyncEventQueue.""" - while True: - event_name, payload = await self.event_queue.get() - if event_name == "user_request": - # Handle the request in a separate thread to avoid blocking the loop - self._loop.run_in_executor(None, self._handle_request_event, payload) - elif event_name == "response": - # Handle AI response event - with self._pending_gui_tasks_lock: - self._pending_gui_tasks.append({ - "action": "handle_ai_response", - "payload": payload - }) - elif event_name == "mma_state_update": - with self._pending_gui_tasks_lock: - self._pending_gui_tasks.append({ - "action": "mma_state_update", - "payload": payload - }) - elif event_name == "mma_stream": - with self._pending_gui_tasks_lock: - self._pending_gui_tasks.append({ - "action": "mma_stream_append", - "payload": payload - }) - elif event_name in ("mma_spawn_approval", "mma_step_approval"): - # Route approval events to GUI tasks — payload already has the - # correct structure for _process_pending_gui_tasks handlers. - with self._pending_gui_tasks_lock: - self._pending_gui_tasks.append(payload) - - def _handle_request_event(self, event: events.UserRequestEvent) -> None: - """Processes a UserRequestEvent by calling the AI client.""" - if self.ui_auto_add_history: - with self._pending_history_adds_lock: - self._pending_history_adds.append({ - "role": "User", - "content": event.prompt, - "collapsed": False, - "ts": project_manager.now_ts() - }) - csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) - ai_client.set_custom_system_prompt("\n\n".join(csp)) - ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit) - ai_client.set_agent_tools(self.ui_agent_tools) - try: - resp = ai_client.send( - event.stable_md, - event.prompt, - event.base_dir, - event.file_items, - event.disc_text, - pre_tool_callback=self._confirm_and_run, - qa_callback=ai_client.run_tier4_analysis - ) - # Emit response event - asyncio.run_coroutine_threadsafe( - self.event_queue.put("response", {"text": resp, "status": "done"}), - self._loop - ) - except ProviderError as e: - asyncio.run_coroutine_threadsafe( - self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}), - self._loop - ) - except Exception as e: - asyncio.run_coroutine_threadsafe( - self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}), - self._loop - ) - def _test_callback_func_write_to_file(self, data: str) -> None: """A dummy function that a custom_callback would execute for testing.""" import os @@ -1208,224 +372,13 @@ class App: os.makedirs("tests/artifacts", exist_ok=True) with open("tests/artifacts/temp_callback_output.txt", "w") as f: f.write(data) - def _recalculate_session_usage(self) -> None: - usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0} - for entry in ai_client.get_comms_log(): - if entry.get("kind") == "response" and "usage" in entry.get("payload", {}): - u = entry["payload"]["usage"] - for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: - if k in usage: - usage[k] += u.get(k, 0) or 0 - self.session_usage = usage - def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None: - if "latency" in payload: - self.session_usage["last_latency"] = payload["latency"] - self._recalculate_session_usage() - if md_content is not None: - stats = ai_client.get_token_stats(md_content) - # Ensure compatibility if keys are named differently - if "total_tokens" in stats and "estimated_prompt_tokens" not in stats: - stats["estimated_prompt_tokens"] = stats["total_tokens"] - self._token_stats = stats - cache_stats = payload.get("cache_stats") - if cache_stats: - count = cache_stats.get("cache_count", 0) - size_bytes = cache_stats.get("total_size_bytes", 0) - self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)" - def cb_load_prior_log(self) -> None: - root = hide_tk_root() - path = filedialog.askopenfilename( - title="Load Session Log", - initialdir="logs", - filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")] - ) - root.destroy() - if not path: - return - entries = [] - try: - with open(path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if line: - try: - entries.append(json.loads(line)) - except json.JSONDecodeError: - continue - except Exception as e: - self.ai_status = f"log load error: {e}" - return - self.prior_session_entries = entries - self.is_viewing_prior_session = True - self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)" - def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str | None: - print(f"[DEBUG] _confirm_and_run triggered for script length: {len(script)}") - if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): - print(f"[DEBUG] test_hooks_enabled is True and ui_manual_approve is False; AUTO-APPROVING script execution in {base_dir}") - self.ai_status = "running powershell..." - output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback) - self._append_tool_log(script, output) - self.ai_status = "powershell done, awaiting AI..." - return output - dialog = ConfirmDialog(script, base_dir) - is_headless = "--headless" in sys.argv - if is_headless: - with self._pending_dialog_lock: - self._pending_actions[dialog._uid] = dialog - print(f"[PENDING_ACTION] Created action {dialog._uid}") - else: - with self._pending_dialog_lock: - self._pending_dialog = dialog - # Notify API hook subscribers - if self.test_hooks_enabled and hasattr(self, '_api_event_queue'): - print("[DEBUG] Pushing script_confirmation_required event to queue") - with self._api_event_queue_lock: - self._api_event_queue.append({ - "type": "script_confirmation_required", - "action_id": dialog._uid, - "script": str(script), - "base_dir": str(base_dir), - "ts": time.time() - }) - approved, final_script = dialog.wait() - if is_headless: - with self._pending_dialog_lock: - if dialog._uid in self._pending_actions: - del self._pending_actions[dialog._uid] - print(f"[DEBUG] _confirm_and_run result: approved={approved}") - if not approved: - self._append_tool_log(final_script, "REJECTED by user") - return None - self.ai_status = "running powershell..." - print(f"[DEBUG] Running powershell in {base_dir}") - output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback) - self._append_tool_log(final_script, output) - self.ai_status = "powershell done, awaiting AI..." - return output - def resolve_pending_action(self, action_id: str, approved: bool) -> bool: - """Resolves a pending PowerShell script confirmation by its ID. - - Args: - action_id: The unique identifier for the pending action. - approved: True if the script should be executed, False otherwise. - - Returns: - bool: True if the action was found and resolved, False otherwise. - """ - with self._pending_dialog_lock: - if action_id in self._pending_actions: - dialog = self._pending_actions[action_id] - with dialog._condition: - dialog._approved = approved - dialog._done = True - dialog._condition.notify_all() - return True - elif self._pending_dialog and self._pending_dialog._uid == action_id: - dialog = self._pending_dialog - with dialog._condition: - dialog._approved = approved - dialog._done = True - dialog._condition.notify_all() - return True - return False - def _append_tool_log(self, script: str, result: str, source_tier: str | None = None) -> None: - self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) - self.ui_last_script_text = script - self.ui_last_script_output = result - self._trigger_script_blink = True - self.show_script_output = True - if self.ui_auto_scroll_tool_calls: - self._scroll_tool_calls_to_bottom = True - - def _flush_to_project(self) -> None: - proj = self.project - proj.setdefault("output", {})["output_dir"] = self.ui_output_dir - proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir - proj["files"]["paths"] = self.files - proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir - proj["screenshots"]["paths"] = self.screenshots - proj.setdefault("project", {}) - proj["project"]["git_dir"] = self.ui_project_git_dir - proj["project"]["system_prompt"] = self.ui_project_system_prompt - proj["project"]["main_context"] = self.ui_project_main_context - proj["project"]["word_wrap"] = self.ui_word_wrap - proj["project"]["summary_only"] = self.ui_summary_only - proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms - proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls - proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path - proj.setdefault("agent", {}).setdefault("tools", {}) - for t_name in AGENT_TOOL_NAMES: - proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True) - self._flush_disc_entries_to_project() - disc_sec = proj.setdefault("discussion", {}) - disc_sec["roles"] = self.disc_roles - disc_sec["active"] = self.active_discussion - disc_sec["auto_add"] = self.ui_auto_add_history - # Save MMA State - mma_sec = proj.setdefault("mma", {}) - mma_sec["epic"] = self.ui_epic_input - if self.active_track: - # We only persist the basic metadata if full serialization is too complex - # For now, let's try full serialization via asdict - from dataclasses import asdict - mma_sec["active_track"] = asdict(self.active_track) - else: - mma_sec["active_track"] = None - - def _flush_to_config(self) -> None: - self.config["ai"] = { - "provider": self.current_provider, - "model": self.current_model, - "temperature": self.temperature, - "max_tokens": self.max_tokens, - "history_trunc_limit": self.history_trunc_limit, - } - self.config["ai"]["system_prompt"] = self.ui_global_system_prompt - self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path} - self.config["gui"] = {"show_windows": self.show_windows} - theme.save_to_config(self.config) - - def _do_generate(self) -> tuple[str, Path, list[dict[str, Any]], str, str]: - """Returns (full_md, output_path, file_items, stable_md, discussion_text).""" - self._flush_to_project() - self._save_active_project() - self._flush_to_config() - save_config(self.config) - track_id = self.active_track.id if self.active_track else None - flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id) - full_md, path, file_items = aggregate.run(flat) - # Build stable markdown (no history) for Gemini caching - screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", ".")) - screenshots = flat.get("screenshots", {}).get("paths", []) - summary_only = flat.get("project", {}).get("summary_only", False) - stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only) - # Build discussion history text separately - history = flat.get("discussion", {}).get("history", []) - discussion_text = aggregate.build_discussion_text(history) - return full_md, path, file_items, stable_md, discussion_text - - def _fetch_models(self, provider: str) -> None: - self.ai_status = "fetching models..." - - def do_fetch() -> None: - try: - models = ai_client.list_models(provider) - self.available_models = models - if self.current_model not in models and models: - self.current_model = models[0] - ai_client.set_provider(self.current_provider, self.current_model) - self.ai_status = f"models loaded: {len(models)}" - except Exception as e: - self.ai_status = f"model fetch error: {e}" - self.models_thread = threading.Thread(target=do_fetch, daemon=True) - self.models_thread.start() # ---------------------------------------------------------------- helpers def _render_text_viewer(self, label: str, content: str) -> None: @@ -1990,186 +943,9 @@ class App: if imgui.button('Plan Epic (Tier 1)', imgui.ImVec2(-1, 0)): self._cb_plan_epic() - def _cb_plan_epic(self) -> None: - def _bg_task() -> None: - try: - self.ai_status = "Planning Epic (Tier 1)..." - history = orchestrator_pm.get_track_history_summary() - proj = project_manager.load_project(self.active_project_path) - flat = project_manager.flat_config(proj) - file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", [])) - _t1_baseline = len(ai_client.get_comms_log()) - tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history) - _t1_new = ai_client.get_comms_log()[_t1_baseline:] - _t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"] - _t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp) - _t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp) - def _push_t1_usage(i: int, o: int) -> None: - self.mma_tier_usage["Tier 1"]["input"] += i - self.mma_tier_usage["Tier 1"]["output"] += o - with self._pending_gui_tasks_lock: - self._pending_gui_tasks.append({ - "action": "custom_callback", - "callback": _push_t1_usage, - "args": [_t1_in, _t1_out] - }) - self._pending_gui_tasks.append({ - "action": "handle_ai_response", - "payload": { - "text": json.dumps(tracks, indent=2), - "stream_id": "Tier 1", - "status": "Epic tracks generated." - } - }) - self._pending_gui_tasks.append({ - "action": "show_track_proposal", - "payload": tracks - }) - except Exception as e: - self.ai_status = f"Epic plan error: {e}" - print(f"ERROR in _cb_plan_epic background task: {e}") - threading.Thread(target=_bg_task, daemon=True).start() - def _cb_accept_tracks(self) -> None: - self._show_track_proposal_modal = False - def _bg_task() -> None: - # Generate skeletons once - self.ai_status = "Phase 2: Generating skeletons for all tracks..." - parser = ASTParser(language="python") - generated_skeletons = "" - try: - for i, file_path in enumerate(self.files): - try: - self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..." - abs_path = Path(self.ui_files_base_dir) / file_path - if abs_path.exists() and abs_path.suffix == ".py": - with open(abs_path, "r", encoding="utf-8") as f: - code = f.read() - generated_skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n" - except Exception as e: - print(f"Error parsing skeleton for {file_path}: {e}") - except Exception as e: - self.ai_status = f"Error generating skeletons: {e}" - print(f"Error generating skeletons: {e}") - return # Exit if skeleton generation fails - # Now loop through tracks and call _start_track_logic with generated skeletons - total_tracks = len(self.proposed_tracks) - for i, track_data in enumerate(self.proposed_tracks): - title = track_data.get("title") or track_data.get("goal", "Untitled Track") - self.ai_status = f"Processing track {i+1} of {total_tracks}: '{title}'..." - self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons - with self._pending_gui_tasks_lock: - self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started - self.ai_status = f"All {total_tracks} tracks accepted and execution started." - threading.Thread(target=_bg_task, daemon=True).start() - - def _cb_start_track(self, user_data: Any = None) -> None: - if isinstance(user_data, str): - # If track_id is provided directly - track_id = user_data - # Ensure it's loaded as active - if not self.active_track or self.active_track.id != track_id: - self._cb_load_track(track_id) - - if self.active_track: - # Use the active track object directly to start execution - self.mma_status = "running" - engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode) - flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id) - full_md, _, _ = aggregate.run(flat) - asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop) - self.ai_status = f"Track '{self.active_track.description}' started." - return - - idx = 0 - if isinstance(user_data, int): - idx = user_data - elif isinstance(user_data, dict): - idx = user_data.get("index", 0) - if 0 <= idx < len(self.proposed_tracks): - track_data = self.proposed_tracks[idx] - title = track_data.get("title") or track_data.get("goal", "Untitled Track") - threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start() - self.ai_status = f"Track '{title}' started." - - def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None: - try: - goal = track_data.get("goal", "") - title = track_data.get("title") or track_data.get("goal", "Untitled Track") - self.ai_status = f"Phase 2: Generating tickets for {title}..." - - skeletons = "" # Initialize skeletons variable - if skeletons_str is None: # Only generate if not provided - # 1. Get skeletons for context - parser = ASTParser(language="python") - for i, file_path in enumerate(self.files): - try: - self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..." - abs_path = Path(self.ui_files_base_dir) / file_path - if abs_path.exists() and abs_path.suffix == ".py": - with open(abs_path, "r", encoding="utf-8") as f: - code = f.read() - skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n" - except Exception as e: - print(f"Error parsing skeleton for {file_path}: {e}") - else: - skeletons = skeletons_str # Use provided skeletons - - self.ai_status = "Phase 2: Calling Tech Lead..." - _t2_baseline = len(ai_client.get_comms_log()) - raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons) - _t2_new = ai_client.get_comms_log()[_t2_baseline:] - _t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"] - _t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp) - _t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp) - self.mma_tier_usage["Tier 2"]["input"] += _t2_in - self.mma_tier_usage["Tier 2"]["output"] += _t2_out - if not raw_tickets: - self.ai_status = f"Error: No tickets generated for track: {title}" - print(f"Warning: No tickets generated for track: {title}") - return - self.ai_status = "Phase 2: Sorting tickets..." - try: - sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets) - except ValueError as e: - print(f"Dependency error in track '{title}': {e}") - sorted_tickets_data = raw_tickets - # 3. Create Track and Ticket objects - from datetime import datetime - now = datetime.now() - tickets = [] - for t_data in sorted_tickets_data: - ticket = Ticket( - id=t_data["id"], - description=t_data.get("description") or t_data.get("goal", "No description"), - status=t_data.get("status", "todo"), - assigned_to=t_data.get("assigned_to", "unassigned"), - depends_on=t_data.get("depends_on", []), - step_mode=t_data.get("step_mode", False) - ) - tickets.append(ticket) - track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}" - track = Track(id=track_id, description=title, tickets=tickets) - # Initialize track state in the filesystem - from models import TrackState, Metadata - from datetime import datetime - now = datetime.now() - meta = Metadata(id=track_id, name=title, status="todo", created_at=now, updated_at=now) - state = TrackState(metadata=meta, discussion=[], tasks=tickets) - project_manager.save_track_state(track_id, state, self.ui_files_base_dir) - # 4. Initialize ConductorEngine and run loop - engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode) - # Use current full markdown context for the track execution - track_id_param = track.id - flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param) - full_md, _, _ = aggregate.run(flat) - # Schedule the coroutine on the internal event loop - asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop) - except Exception as e: - self.ai_status = f"Track start error: {e}" - print(f"ERROR in _start_track_logic: {e}") def _render_track_proposal_modal(self) -> None: if self._show_track_proposal_modal: @@ -2736,104 +1512,10 @@ class App: if is_blinking: imgui.pop_style_color(2) - def _cb_ticket_retry(self, ticket_id: str) -> None: - for t in self.active_tickets: - if t.get('id') == ticket_id: - t['status'] = 'todo' - break - asyncio.run_coroutine_threadsafe( - self.event_queue.put("mma_retry", {"ticket_id": ticket_id}), - self._loop - ) - def _cb_ticket_skip(self, ticket_id: str) -> None: - for t in self.active_tickets: - if t.get('id') == ticket_id: - t['status'] = 'skipped' - break - asyncio.run_coroutine_threadsafe( - self.event_queue.put("mma_skip", {"ticket_id": ticket_id}), - self._loop - ) - def _cb_run_conductor_setup(self) -> None: - base = Path("conductor") - if not base.exists(): - self.ui_conductor_setup_summary = "Error: conductor/ directory not found." - return - files = list(base.glob("**/*")) - files = [f for f in files if f.is_file()] - summary = [f"Conductor Directory: {base.absolute()}"] - summary.append(f"Total Files: {len(files)}") - total_lines = 0 - for f in files: - try: - with open(f, "r", encoding="utf-8") as fd: - lines = len(fd.readlines()) - total_lines += lines - summary.append(f"- {f.relative_to(base)}: {lines} lines") - except Exception: - summary.append(f"- {f.relative_to(base)}: Error reading") - summary.append(f"Total Line Count: {total_lines}") - tracks_dir = base / "tracks" - if tracks_dir.exists(): - tracks = [d for d in tracks_dir.iterdir() if d.is_dir()] - summary.append(f"Total Tracks Found: {len(tracks)}") - else: - summary.append("Tracks Directory: Not found") - self.ui_conductor_setup_summary = "\n".join(summary) - def _cb_create_track(self, name: str, desc: str, track_type: str) -> None: - if not name: return - from datetime import datetime - date_suffix = datetime.now().strftime("%Y%m%d") - track_id = f"{name.lower().replace(' ', '_')}_{date_suffix}" - track_dir = Path("conductor/tracks") / track_id - track_dir.mkdir(parents=True, exist_ok=True) - spec_file = track_dir / "spec.md" - with open(spec_file, "w", encoding="utf-8") as f: - f.write(f"# Specification: {name}\n\nType: {track_type}\n\nDescription: {desc}\n") - plan_file = track_dir / "plan.md" - with open(plan_file, "w", encoding="utf-8") as f: - f.write(f"# Implementation Plan: {name}\n\n- [ ] Task 1: Initialize\n") - meta_file = track_dir / "metadata.json" - import json - with open(meta_file, "w", encoding="utf-8") as f: - json.dump({ - "id": track_id, - "title": name, - "description": desc, - "type": track_type, - "status": "new", - "progress": 0.0 - }, f, indent=1) - # Refresh tracks from disk - self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) - def _push_mma_state_update(self) -> None: - if not self.active_track: - return - # Sync active_tickets (list of dicts) back to active_track.tickets (list of Ticket objects) - self.active_track.tickets = [Ticket.from_dict(t) for t in self.active_tickets] - # Save the state to disk - from project_manager import save_track_state, load_track_state - from models import TrackState, Metadata - from datetime import datetime - - existing = load_track_state(self.active_track.id, self.ui_files_base_dir) - meta = Metadata( - id=self.active_track.id, - name=self.active_track.description, - status=self.mma_status, - created_at=existing.metadata.created_at if existing else datetime.now(), - updated_at=datetime.now() - ) - state = TrackState( - metadata=meta, - discussion=existing.discussion if existing else [], - tasks=self.active_track.tickets - ) - save_track_state(self.active_track.id, state, self.ui_files_base_dir) def _render_tool_calls_panel(self) -> None: imgui.text("Tool call history") @@ -3442,21 +2124,6 @@ class App: def run(self) -> None: """Initializes the ImGui runner and starts the main application loop.""" - self.controller.start_services() - self._loop = self.controller._loop - self._loop_thread = self.controller._loop_thread - if self._loop: - self._loop.call_soon_threadsafe(lambda: self._loop.create_task(self._process_event_queue())) - - async def queue_fallback() -> None: - while True: - try: - self._process_pending_gui_tasks() - self._process_pending_history_adds() - except: pass - await asyncio.sleep(0.1) - - self._loop.call_soon_threadsafe(lambda: self._loop.create_task(queue_fallback())) if "--headless" in sys.argv: print("Headless mode active") diff --git a/test_migration_sanity.py b/test_migration_sanity.py new file mode 100644 index 0000000..cb15c74 --- /dev/null +++ b/test_migration_sanity.py @@ -0,0 +1,26 @@ +import os +import sys + +# Ensure src/ is in path +project_root = os.path.dirname(os.path.abspath(__file__)) +src_path = os.path.join(project_root, "src") +sys.path.insert(0, src_path) + +# Mock ImGui bundle to avoid needing a window for simple state check +from unittest.mock import MagicMock +import sys +sys.modules['imgui_bundle'] = MagicMock() +sys.modules['imgui_bundle.imgui'] = MagicMock() +sys.modules['imgui_bundle.hello_imgui'] = MagicMock() +sys.modules['imgui_bundle.immapp'] = MagicMock() + +try: + from gui_2 import App + app = App() + print("App instantiated successfully!") + print(f"Controller active_project_path: {app.controller.active_project_path}") + print(f"App config: {app.config is app.controller.config}") # Should be True via __getattr__ +except Exception as e: + print(f"Failed to instantiate App: {e}") + import traceback + traceback.print_exc()