# gui_2.py from __future__ import annotations import tomli_w import threading import asyncio import time import math import json import sys import os import uuid import requests # type: ignore[import-untyped] from pathlib import Path from tkinter import filedialog, Tk from typing import Optional, Callable, Any import aggregate import ai_client import cost_tracker from ai_client import ProviderError import shell_runner import session_logger import project_manager import theme_2 as theme import tomllib import events import numpy as np import api_hooks import mcp_client import orchestrator_pm from performance_monitor import PerformanceMonitor from log_registry import LogRegistry from log_pruner import LogPruner import conductor_tech_lead import multi_agent_conductor from models import Track, Ticket from file_cache import ASTParser from fastapi import FastAPI, Depends, HTTPException from fastapi.security.api_key import APIKeyHeader from pydantic import BaseModel from imgui_bundle import imgui, hello_imgui, immapp CONFIG_PATH: Path = Path("config.toml") PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek"] COMMS_CLAMP_CHARS: int = 300 def load_config() -> dict[str, Any]: with open(CONFIG_PATH, "rb") as f: return tomllib.load(f) def save_config(config: dict[str, Any]) -> None: with open(CONFIG_PATH, "wb") as f: tomli_w.dump(config, f) def hide_tk_root() -> Tk: root = Tk() root.withdraw() root.wm_attributes("-topmost", True) return root # Color Helpers def vec4(r: float, g: float, b: float, a: float = 1.0) -> imgui.ImVec4: return imgui.ImVec4(r/255, g/255, b/255, a) C_OUT: imgui.ImVec4 = vec4(100, 200, 255) C_IN: imgui.ImVec4 = vec4(140, 255, 160) C_REQ: imgui.ImVec4 = vec4(255, 220, 100) C_RES: imgui.ImVec4 = vec4(180, 255, 180) C_TC: imgui.ImVec4 = vec4(255, 180, 80) C_TR: imgui.ImVec4 = vec4(180, 220, 255) C_TRS: imgui.ImVec4 = vec4(200, 180, 255) C_LBL: imgui.ImVec4 = vec4(180, 180, 180) C_VAL: imgui.ImVec4 = vec4(220, 220, 220) C_KEY: imgui.ImVec4 = vec4(140, 200, 255) C_NUM: imgui.ImVec4 = vec4(180, 255, 180) C_SUB: imgui.ImVec4 = vec4(220, 200, 120) DIR_COLORS: dict[str, imgui.ImVec4] = {"OUT": C_OUT, "IN": C_IN} KIND_COLORS: dict[str, imgui.ImVec4] = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS} HEAVY_KEYS: set[str] = {"message", "text", "script", "output", "content"} DISC_ROLES: list[str] = ["User", "AI", "Vendor API", "System"] AGENT_TOOL_NAMES: list[str] = [ "run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url", "py_get_skeleton", "py_get_code_outline", "get_file_slice", "py_get_definition", "py_get_signature", "py_get_class_summary", "py_get_var_declaration", "get_git_diff", "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree", "get_ui_performance", # Mutating tools — disabled by default "set_file_slice", "py_update_definition", "py_set_signature", "py_set_var_declaration", ] def truncate_entries(entries: list[dict[str, Any]], max_pairs: int) -> list[dict[str, Any]]: if max_pairs <= 0: return [] count = 0 target = max_pairs * 2 for i in range(len(entries) - 1, -1, -1): role = entries[i].get("role", "") if role in ("User", "AI"): count += 1 if count == target: return entries[i:] return entries def _parse_history_entries(history: list[str], roles: list[str] | None = None) -> list[dict[str, Any]]: known = roles if roles is not None else DISC_ROLES entries = [] for raw in history: entry = project_manager.str_to_entry(raw, known) entries.append(entry) return entries class ConfirmDialog: def __init__(self, script: str, base_dir: str) -> None: self._uid = str(uuid.uuid4()) self._script = str(script) if script is not None else "" self._base_dir = str(base_dir) if base_dir is not None else "" self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: with self._condition: while not self._done: self._condition.wait(timeout=0.1) return self._approved, self._script class MMAApprovalDialog: def __init__(self, ticket_id: str, payload: str) -> None: self._payload = payload self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: with self._condition: while not self._done: self._condition.wait(timeout=0.1) return self._approved, self._payload class MMASpawnApprovalDialog: def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None: self._prompt = prompt self._context_md = context_md self._condition = threading.Condition() self._done = False self._approved = False self._abort = False def wait(self) -> dict[str, Any]: with self._condition: while not self._done: self._condition.wait(timeout=0.1) return { 'approved': self._approved, 'abort': self._abort, 'prompt': self._prompt, 'context_md': self._context_md } class GenerateRequest(BaseModel): prompt: str auto_add_history: bool = True temperature: float | None = None max_tokens: int | None = None class ConfirmRequest(BaseModel): approved: bool script: Optional[str] = None class App: """The main ImGui interface orchestrator for Manual Slop.""" def __init__(self) -> None: # Initialize locks first to avoid initialization order issues self._send_thread_lock: threading.Lock = threading.Lock() self._disc_entries_lock: threading.Lock = threading.Lock() self._pending_comms_lock: threading.Lock = threading.Lock() self._pending_tool_calls_lock: threading.Lock = threading.Lock() self._pending_history_adds_lock: threading.Lock = threading.Lock() self._pending_gui_tasks_lock: threading.Lock = threading.Lock() self._pending_dialog_lock: threading.Lock = threading.Lock() self._api_event_queue_lock: threading.Lock = threading.Lock() self.config: dict[str, Any] = load_config() self.event_queue: events.AsyncEventQueue = events.AsyncEventQueue() self._loop: asyncio.AbstractEventLoop = asyncio.new_event_loop() self._loop_thread: threading.Thread = threading.Thread(target=self._run_event_loop, daemon=True) self._loop_thread.start() ai_cfg = self.config.get("ai", {}) self._current_provider: str = ai_cfg.get("provider", "gemini") self._current_model: str = ai_cfg.get("model", "gemini-2.5-flash-lite") self.available_models: list[str] = [] self.temperature: float = ai_cfg.get("temperature", 0.0) self.max_tokens: int = ai_cfg.get("max_tokens", 8192) self.history_trunc_limit: int = ai_cfg.get("history_trunc_limit", 8000) projects_cfg = self.config.get("projects", {}) self.project_paths: list[str] = list(projects_cfg.get("paths", [])) self.active_project_path: str = projects_cfg.get("active", "") self.project: dict[str, Any] = {} self.active_discussion: str = "main" self._load_active_project() self.files: list[str] = list(self.project.get("files", {}).get("paths", [])) self.screenshots: list[str] = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles: list[str] = list(disc_sec.get("roles", list(DISC_ROLES))) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) with self._disc_entries_lock: self.disc_entries: list[dict[str, Any]] = _parse_history_entries(disc_data.get("history", []), self.disc_roles) self.ui_output_dir: str = self.project.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir: str = self.project.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir: str = self.project.get("screenshots", {}).get("base_dir", ".") proj_meta = self.project.get("project", {}) self.ui_project_git_dir: str = proj_meta.get("git_dir", "") self.ui_project_main_context: str = proj_meta.get("main_context", "") self.ui_project_system_prompt: str = proj_meta.get("system_prompt", "") self.ui_gemini_cli_path: str = self.project.get("gemini_cli", {}).get("binary_path", "gemini") self.ui_word_wrap: bool = proj_meta.get("word_wrap", True) self.ui_summary_only: bool = proj_meta.get("summary_only", False) self.ui_auto_add_history: bool = disc_sec.get("auto_add", False) self.ui_global_system_prompt: str = self.config.get("ai", {}).get("system_prompt", "") self.ui_ai_input: str = "" self.ui_disc_new_name_input: str = "" self.ui_disc_new_role_input: str = "" self.ui_epic_input: str = "" self.proposed_tracks: list[dict[str, Any]] = [] self._show_track_proposal_modal: bool = False self.ui_new_track_name: str = "" self.ui_new_track_desc: str = "" self.ui_new_track_type: str = "feature" self.ui_conductor_setup_summary: str = "" self.ui_last_script_text: str = "" self.ui_last_script_output: str = "" self.ai_status: str = "idle" self.ai_response: str = "" self.last_md: str = "" self.last_md_path: Path | None = None self.last_file_items: list[Any] = [] self.send_thread: threading.Thread | None = None self.models_thread: threading.Thread | None = None _default_windows = { "Context Hub": True, "Files & Media": True, "AI Settings": True, "MMA Dashboard": True, "Tier 1: Strategy": True, "Tier 2: Tech Lead": True, "Tier 3: Workers": True, "Tier 4: QA": True, "Discussion Hub": True, "Operations Hub": True, "Theme": True, "Log Management": False, "Diagnostics": False, } saved = self.config.get("gui", {}).get("show_windows", {}) self.show_windows: dict[str, bool] = {k: saved.get(k, v) for k, v in _default_windows.items()} self.show_script_output: bool = False self.show_text_viewer: bool = False self.text_viewer_title: str = "" self.text_viewer_content: str = "" self._pending_dialog: ConfirmDialog | None = None self._pending_dialog_open: bool = False self._pending_actions: dict[str, ConfirmDialog] = {} self._pending_ask_dialog: bool = False self._ask_dialog_open: bool = False self._ask_request_id: str | None = None self._ask_tool_data: dict[str, Any] | None = None self.mma_step_mode: bool = False self.active_track: Track | None = None self.active_tickets: list[dict[str, Any]] = [] self.active_tier: str | None = None self.ui_focus_agent: str | None = None self.mma_status: str = "idle" self._pending_mma_approval: dict[str, Any] | None = None self._mma_approval_open: bool = False self._mma_approval_edit_mode: bool = False self._mma_approval_payload: str = "" self._pending_mma_spawn: dict[str, Any] | None = None self._mma_spawn_open: bool = False self._mma_spawn_edit_mode: bool = False self._mma_spawn_prompt: str = '' self._mma_spawn_context: str = '' self.mma_tier_usage: dict[str, dict[str, Any]] = { "Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"}, "Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview"}, "Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"}, "Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"}, } self._tool_log: list[dict[str, Any]] = [] self._comms_log: list[dict[str, Any]] = [] self._pending_comms: list[dict[str, Any]] = [] self._pending_tool_calls: list[dict[str, Any]] = [] self._pending_history_adds: list[dict[str, Any]] = [] self._trigger_blink: bool = False self._is_blinking: bool = False self._blink_start_time: float = 0.0 self._trigger_script_blink: bool = False self._is_script_blinking: bool = False self._script_blink_start_time: float = 0.0 self._scroll_disc_to_bottom: bool = False self._scroll_comms_to_bottom: bool = False self._scroll_tool_calls_to_bottom: bool = False self._pending_gui_tasks: list[dict[str, Any]] = [] self.session_usage: dict[str, Any] = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "last_latency": 0.0} self._gemini_cache_text: str = "" self._last_stable_md: str = '' self._token_stats: dict[str, Any] = {} self._token_stats_dirty: bool = False self.ui_disc_truncate_pairs: int = 2 self.ui_auto_scroll_comms: bool = True self.ui_auto_scroll_tool_calls: bool = True agent_tools_cfg = self.project.get("agent", {}).get("tools", {}) self.ui_agent_tools: dict[str, bool] = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES} self.tracks: list[dict[str, Any]] = [] self._show_add_ticket_form: bool = False self.ui_new_ticket_id: str = "" self.ui_new_ticket_desc: str = "" self.ui_new_ticket_target: str = "" self.ui_new_ticket_deps: str = "" self._track_discussion_active: bool = False self.mma_streams: dict[str, str] = {} self._tier_stream_last_len: dict[str, int] = {} self.is_viewing_prior_session: bool = False self.prior_session_entries: list[dict[str, Any]] = [] self.test_hooks_enabled: bool = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1") self.ui_manual_approve: bool = False self.perf_monitor: PerformanceMonitor = PerformanceMonitor() self.perf_history: dict[str, list[float]] = {"frame_time": [0.0]*100, "fps": [0.0]*100, "cpu": [0.0]*100, "input_lag": [0.0]*100} self._perf_last_update: float = 0.0 self._autosave_interval: float = 60.0 self._last_autosave: float = time.time() label = self.project.get("project", {}).get("name", "") session_logger.open_session(label=label) self._prune_old_logs() self._init_ai_and_hooks() def _prune_old_logs(self) -> None: """Asynchronously prunes old insignificant logs on startup.""" def run_prune() -> None: try: registry = LogRegistry("logs/log_registry.toml") pruner = LogPruner(registry, "logs") pruner.prune() except Exception as e: print(f"Error during log pruning: {e}") thread = threading.Thread(target=run_prune, daemon=True) thread.start() @property def current_provider(self) -> str: return self._current_provider @current_provider.setter def current_provider(self, value: str) -> None: if value != self._current_provider: self._current_provider = value ai_client.reset_session() ai_client.set_provider(value, self.current_model) if value == "gemini_cli": if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) else: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path if hasattr(self, 'hook_server'): self.hook_server.start() self.available_models = [] self.available_models = [] self._fetch_models(value) self._token_stats = {} self._token_stats_dirty = True @property def current_model(self) -> str: return self._current_model @current_model.setter def current_model(self, value: str) -> None: if value != self._current_model: self._current_model = value ai_client.reset_session() ai_client.set_provider(self.current_provider, value) self._token_stats = {} self._token_stats_dirty = True def _init_ai_and_hooks(self) -> None: ai_client.set_provider(self.current_provider, self.current_model) if self.current_provider == "gemini_cli": if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) else: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path ai_client.confirm_and_run_callback = self._confirm_and_run ai_client.comms_log_callback = self._on_comms_entry ai_client.tool_log_callback = self._on_tool_log mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics self.perf_monitor.alert_callback = self._on_performance_alert ai_client.events.on("request_start", self._on_api_event) ai_client.events.on("response_received", self._on_api_event) ai_client.events.on("tool_execution", self._on_api_event) self._settable_fields: dict[str, str] = { 'ai_input': 'ui_ai_input', 'project_git_dir': 'ui_project_git_dir', 'auto_add_history': 'ui_auto_add_history', 'disc_new_name_input': 'ui_disc_new_name_input', 'project_main_context': 'ui_project_main_context', 'gcli_path': 'ui_gemini_cli_path', 'output_dir': 'ui_output_dir', 'files_base_dir': 'ui_files_base_dir', 'ai_status': 'ai_status', 'ai_response': 'ai_response', 'active_discussion': 'active_discussion', 'current_provider': 'current_provider', 'current_model': 'current_model', 'token_budget_pct': '_token_budget_pct', 'token_budget_current': '_token_budget_current', 'token_budget_label': '_token_budget_label', 'show_confirm_modal': 'show_confirm_modal', 'mma_epic_input': 'ui_epic_input', 'mma_status': 'mma_status', 'mma_active_tier': 'active_tier', 'ui_new_track_name': 'ui_new_track_name', 'ui_new_track_desc': 'ui_new_track_desc', 'manual_approve': 'ui_manual_approve' } self._clickable_actions: dict[str, Callable[..., Any]] = { 'btn_reset': self._handle_reset_session, 'btn_gen_send': self._handle_generate_send, 'btn_md_only': self._handle_md_only, 'btn_approve_script': self._handle_approve_script, 'btn_reject_script': self._handle_reject_script, 'btn_project_save': self._cb_project_save, 'btn_disc_create': self._cb_disc_create, 'btn_mma_plan_epic': self._cb_plan_epic, 'btn_mma_accept_tracks': self._cb_accept_tracks, 'btn_mma_start_track': self._cb_start_track, 'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type), 'btn_approve_tool': self._handle_approve_tool, 'btn_approve_mma_step': self._handle_approve_mma_step, 'btn_approve_spawn': self._handle_approve_spawn, } self._predefined_callbacks: dict[str, Callable[..., Any]] = { '_test_callback_func_write_to_file': self._test_callback_func_write_to_file } self._discussion_names_cache: list[str] = [] self._discussion_names_dirty: bool = True self.hook_server = api_hooks.HookServer(self) self.hook_server.start() def create_api(self) -> FastAPI: """Creates and configures the FastAPI application for headless mode.""" api = FastAPI(title="Manual Slop Headless API") API_KEY_NAME = "X-API-KEY" api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) async def get_api_key(header_key: str = Depends(api_key_header)) -> str: """Validates the API key from the request header against configuration.""" headless_cfg = self.config.get("headless", {}) config_key = headless_cfg.get("api_key", "").strip() env_key = os.environ.get("SLOP_API_KEY", "").strip() target_key = env_key or config_key if not target_key: raise HTTPException(status_code=403, detail="API Key not configured on server") if header_key == target_key: return header_key raise HTTPException(status_code=403, detail="Could not validate API Key") @api.get("/health") def health() -> dict[str, str]: """Returns the health status of the API.""" return {"status": "ok"} @api.get("/status", dependencies=[Depends(get_api_key)]) def status() -> dict[str, Any]: """Returns the current status of the application.""" return { "provider": self.current_provider, "model": self.current_model, "status": self.ai_status, "usage": self.session_usage } @api.post("/api/v1/generate", dependencies=[Depends(get_api_key)]) def generate(req: GenerateRequest) -> dict[str, Any]: """Triggers an AI generation request using the current project context.""" if not req.prompt.strip(): raise HTTPException(status_code=400, detail="Prompt cannot be empty") with self._send_thread_lock: start_time = time.time() try: md, path, file_items, stable_md, disc_text = self._do_generate() self._last_stable_md = stable_md self.last_md = md self.last_md_path = path self.last_file_items = file_items except Exception as e: raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") user_msg = req.prompt base_dir = self.ui_files_base_dir csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) temp = req.temperature if req.temperature is not None else self.temperature tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens ai_client.set_model_params(temp, tokens, self.history_trunc_limit) ai_client.set_agent_tools(self.ui_agent_tools) if req.auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "User", "content": user_msg, "collapsed": False, "ts": project_manager.now_ts() }) try: resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text) if req.auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "AI", "content": resp, "collapsed": False, "ts": project_manager.now_ts() }) self._recalculate_session_usage() duration = time.time() - start_time return { "text": resp, "metadata": { "provider": self.current_provider, "model": self.current_model, "duration_sec": round(duration, 3), "timestamp": project_manager.now_ts() }, "usage": self.session_usage } except ProviderError as e: raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}") except Exception as e: raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}") @api.post("/api/v1/stream", dependencies=[Depends(get_api_key)]) async def stream(req: GenerateRequest) -> Any: """Placeholder for streaming AI generation responses (Not yet implemented).""" raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.") @api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)]) def pending_actions() -> list[dict[str, Any]]: """Lists all pending PowerShell scripts awaiting confirmation.""" with self._pending_dialog_lock: return [ {"action_id": uid, "script": diag._script, "base_dir": diag._base_dir} for uid, diag in self._pending_actions.items() ] @api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)]) def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, str]: """Approves or rejects a pending action.""" with self._pending_dialog_lock: if action_id not in self._pending_actions: raise HTTPException(status_code=404, detail="Action not found") dialog = self._pending_actions.pop(action_id) if req.script is not None: dialog._script = req.script with dialog._condition: dialog._approved = req.approved dialog._done = True dialog._condition.notify_all() return {"status": "confirmed" if req.approved else "rejected"} @api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)]) def list_sessions() -> list[str]: """Lists all session log files.""" log_dir = Path("logs") if not log_dir.exists(): return [] return [f.name for f in log_dir.glob("*.log")] @api.get("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) def get_session(session_id: str) -> dict[str, Any]: """Returns the content of a specific session log.""" log_path = Path("logs") / session_id if not log_path.exists(): raise HTTPException(status_code=404, detail="Session log not found") return {"id": session_id, "content": log_path.read_text(encoding="utf-8", errors="replace")} @api.delete("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) def delete_session(session_id: str) -> dict[str, str]: """Deletes a specific session log.""" log_path = Path("logs") / session_id if not log_path.exists(): raise HTTPException(status_code=404, detail="Session log not found") log_path.unlink() return {"status": "deleted"} @api.get("/api/v1/context", dependencies=[Depends(get_api_key)]) def get_context() -> dict[str, Any]: """Returns the current aggregated project context.""" try: md, path, file_items, stable_md, disc_text = self._do_generate() # Pull current screenshots if available in project screenshots = self.project.get("screenshots", {}).get("paths", []) return { "files": [f.get("path") if isinstance(f, dict) else str(f) for f in file_items], "screenshots": screenshots, "files_base_dir": self.ui_files_base_dir, "markdown": md, "discussion": disc_text } except Exception as e: raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") @api.get("/api/v1/token_stats", dependencies=[Depends(get_api_key)]) def token_stats() -> dict[str, Any]: """Returns current token usage and budget statistics.""" return self._token_stats return api # ---------------------------------------------------------------- project loading def _cb_new_project_automated(self, user_data: Any) -> None: if user_data: name = Path(user_data).stem proj = project_manager.default_project(name) project_manager.save_project(proj, user_data) if user_data not in self.project_paths: self.project_paths.append(user_data) self._switch_project(user_data) def _cb_project_save(self) -> None: self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self.ai_status = "config saved" def _cb_disc_create(self) -> None: nm = self.ui_disc_new_name_input.strip() if nm: self._create_discussion(nm) self.ui_disc_new_name_input = "" def _load_active_project(self) -> None: if self.active_project_path and Path(self.active_project_path).exists(): try: self.project = project_manager.load_project(self.active_project_path) return except Exception as e: print(f"Failed to load project {self.active_project_path}: {e}") for pp in self.project_paths: if Path(pp).exists(): try: self.project = project_manager.load_project(pp) self.active_project_path = pp return except Exception: continue self.project = project_manager.migrate_from_legacy_config(self.config) name = self.project.get("project", {}).get("name", "project") fallback_path = f"{name}.toml" project_manager.save_project(self.project, fallback_path) self.active_project_path = fallback_path if fallback_path not in self.project_paths: self.project_paths.append(fallback_path) def _switch_project(self, path: str) -> None: if not Path(path).exists(): self.ai_status = f"project file not found: {path}" return self._flush_to_project() self._save_active_project() try: self.project = project_manager.load_project(path) self.active_project_path = path except Exception as e: self.ai_status = f"failed to load project: {e}" return self._refresh_from_project() self._discussion_names_dirty = True ai_client.reset_session() self.ai_status = f"switched to: {Path(path).stem}" def _refresh_from_project(self) -> None: self.files = list(self.project.get("files", {}).get("paths", [])) self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES))) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) with self._disc_entries_lock: self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles) proj = self.project self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".") self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "") self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "") self.ui_project_main_context = proj.get("project", {}).get("main_context", "") self.ui_gemini_cli_path = proj.get("gemini_cli", {}).get("binary_path", "gemini") self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False) self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True) self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True) self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True) self.ui_summary_only = proj.get("project", {}).get("summary_only", False) agent_tools_cfg = proj.get("agent", {}).get("tools", {}) self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES} # MMA Tracks self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) # Restore MMA state mma_sec = proj.get("mma", {}) self.ui_epic_input = mma_sec.get("epic", "") at_data = mma_sec.get("active_track") if at_data: try: tickets = [] for t_data in at_data.get("tickets", []): tickets.append(Ticket(**t_data)) self.active_track = Track( id=at_data.get("id"), description=at_data.get("description"), tickets=tickets ) self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table except Exception as e: print(f"Failed to deserialize active track: {e}") self.active_track = None else: self.active_track = None self.active_tickets = [] # Load track-scoped history if track is active if self.active_track: track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir) if track_history: with self._disc_entries_lock: self.disc_entries = _parse_history_entries(track_history, self.disc_roles) def _cb_load_track(self, track_id: str) -> None: state = project_manager.load_track_state(track_id, self.ui_files_base_dir) if state: try: # Convert list[Ticket] or list[dict] to list[Ticket] for Track object tickets = [] for t in state.tasks: if isinstance(t, dict): tickets.append(Ticket(**t)) else: tickets.append(t) self.active_track = Track( id=state.metadata.id, description=state.metadata.name, tickets=tickets ) # Keep dicts for UI table (or convert Ticket objects back to dicts if needed) from dataclasses import asdict self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets] # Load track-scoped history history = project_manager.load_track_history(track_id, self.ui_files_base_dir) with self._disc_entries_lock: if history: self.disc_entries = _parse_history_entries(history, self.disc_roles) else: self.disc_entries = [] self._recalculate_session_usage() self.ai_status = f"Loaded track: {state.metadata.name}" except Exception as e: self.ai_status = f"Load track error: {e}" print(f"Error loading track {track_id}: {e}") def _save_active_project(self) -> None: if self.active_project_path: try: project_manager.save_project(self.project, self.active_project_path) except Exception as e: self.ai_status = f"save error: {e}" # ---------------------------------------------------------------- discussion management def _get_discussion_names(self) -> list[str]: if self._discussion_names_dirty: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) self._discussion_names_cache = sorted(discussions.keys()) self._discussion_names_dirty = False return self._discussion_names_cache def _switch_discussion(self, name: str) -> None: self._flush_disc_entries_to_project() disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if name not in discussions: self.ai_status = f"discussion not found: {name}" return self.active_discussion = name self.active_discussion_idx = -1 discussions_root = self.project.get("discussions", []) for i, d in enumerate(discussions_root): if isinstance(d, dict) and d.get("title") == name: self.active_discussion_idx = i break self._track_discussion_active = False disc_sec["active"] = name self._discussion_names_dirty = True disc_data = discussions[name] with self._disc_entries_lock: self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles) self.ai_status = f"discussion: {name}" def _flush_disc_entries_to_project(self) -> None: history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries] if self.active_track and self._track_discussion_active: project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir) return disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion()) disc_data["history"] = history_strings disc_data["last_updated"] = project_manager.now_ts() def _create_discussion(self, name: str) -> None: disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) if name in discussions: self.ai_status = f"discussion '{name}' already exists" return discussions[name] = project_manager.default_discussion() self._discussion_names_dirty = True self._switch_discussion(name) def _rename_discussion(self, old_name: str, new_name: str) -> None: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if old_name not in discussions: return if new_name in discussions: self.ai_status = f"discussion '{new_name}' already exists" return discussions[new_name] = discussions.pop(old_name) self._discussion_names_dirty = True if self.active_discussion == old_name: self.active_discussion = new_name disc_sec["active"] = new_name def _delete_discussion(self, name: str) -> None: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if len(discussions) <= 1: self.ai_status = "cannot delete the last discussion" return if name not in discussions: return del discussions[name] self._discussion_names_dirty = True if self.active_discussion == name: remaining = sorted(discussions.keys()) self._switch_discussion(remaining[0]) # ---------------------------------------------------------------- logic def _on_comms_entry(self, entry: dict[str, Any]) -> None: # sys.stderr.write(f"[DEBUG] _on_comms_entry: {entry.get('kind')} {entry.get('direction')}\n") session_logger.log_comms(entry) entry["local_ts"] = time.time() kind = entry.get("kind") payload = entry.get("payload", {}) if kind in ("tool_result", "tool_call"): role = "Tool" if kind == "tool_result" else "Vendor API" content = "" if kind == "tool_result": content = payload.get("output", "") else: content = payload.get("script") or payload.get("args") or payload.get("message", "") if isinstance(content, dict): content = json.dumps(content, indent=1) with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": role, "content": f"[{kind.upper().replace('_', ' ')}]\n{content}", "collapsed": True, "ts": entry.get("ts", project_manager.now_ts()) }) # If this is a history_add kind, route it to history queue instead if kind == "history_add": payload = entry.get("payload", {}) with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": payload.get("role", "AI"), "content": payload.get("content", ""), "collapsed": payload.get("collapsed", False), "ts": entry.get("ts", project_manager.now_ts()) }) return with self._pending_comms_lock: self._pending_comms.append(entry) def _on_tool_log(self, script: str, result: str) -> None: session_logger.log_tool_call(script, result, None) source_tier = ai_client.current_tier with self._pending_tool_calls_lock: self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) def _on_api_event(self, *args: Any, **kwargs: Any) -> None: payload = kwargs.get("payload", {}) with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload}) def _on_performance_alert(self, message: str) -> None: """Called by PerformanceMonitor when a threshold is exceeded.""" alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load." # Inject into history as a 'System' message with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "System", "content": alert_text, "ts": project_manager.now_ts() }) def _process_pending_gui_tasks(self) -> None: if not self._pending_gui_tasks: return with self._pending_gui_tasks_lock: tasks = self._pending_gui_tasks[:] self._pending_gui_tasks.clear() for task in tasks: try: action = task.get("action") if action == "refresh_api_metrics": self._refresh_api_metrics(task.get("payload", {}), md_content=self.last_md or None) elif action == "handle_ai_response": payload = task.get("payload", {}) text = payload.get("text", "") stream_id = payload.get("stream_id") is_streaming = payload.get("status") == "streaming..." if stream_id: if is_streaming: if stream_id not in self.mma_streams: self.mma_streams[stream_id] = "" self.mma_streams[stream_id] += text else: self.mma_streams[stream_id] = text if stream_id == "Tier 1": if "status" in payload: self.ai_status = payload["status"] else: if is_streaming: self.ai_response += text else: self.ai_response = text self.ai_status = payload.get("status", "done") self._trigger_blink = True if not stream_id: self._token_stats_dirty = True if self.ui_auto_add_history and not stream_id: role = payload.get("role", "AI") with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": role, "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts() }) elif action == "mma_stream_append": payload = task.get("payload", {}) stream_id = payload.get("stream_id") text = payload.get("text", "") if stream_id: if stream_id not in self.mma_streams: self.mma_streams[stream_id] = "" self.mma_streams[stream_id] += text elif action == "show_track_proposal": self.proposed_tracks = task.get("payload", []) self._show_track_proposal_modal = True elif action == "mma_state_update": payload = task.get("payload", {}) self.mma_status = payload.get("status", "idle") self.active_tier = payload.get("active_tier") self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage) self.active_tickets = payload.get("tickets", []) track_data = payload.get("track") if track_data: tickets = [] for t_data in self.active_tickets: tickets.append(Ticket(**t_data)) self.active_track = Track( id=track_data.get("id"), description=track_data.get("title", ""), tickets=tickets ) elif action == "set_value": item = task.get("item") value = task.get("value") if item in self._settable_fields: attr_name = self._settable_fields[item] setattr(self, attr_name, value) if item == "gcli_path": if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=str(value)) else: ai_client._gemini_cli_adapter.binary_path = str(value) elif action == "click": item = task.get("item") user_data = task.get("user_data") if item == "btn_project_new_automated": self._cb_new_project_automated(user_data) elif item == "btn_mma_load_track": self._cb_load_track(str(user_data or "")) elif item in self._clickable_actions: # Check if it's a method that accepts user_data import inspect func = self._clickable_actions[item] try: sig = inspect.signature(func) if 'user_data' in sig.parameters: func(user_data=user_data) else: func() except Exception: func() elif action == "select_list_item": item = task.get("listbox", task.get("item")) value = task.get("item_value", task.get("value")) if item == "disc_listbox": self._switch_discussion(str(value or "")) elif task.get("type") == "ask": self._pending_ask_dialog = True self._ask_request_id = task.get("request_id") self._ask_tool_data = task.get("data", {}) elif action == "clear_ask": if self._ask_request_id == task.get("request_id"): self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None elif action == "custom_callback": cb = task.get("callback") args = task.get("args", []) if callable(cb): try: cb(*args) except Exception as e: print(f"Error in direct custom callback: {e}") elif cb in self._predefined_callbacks: self._predefined_callbacks[cb](*args) elif action == "mma_step_approval": dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or "")) self._pending_mma_approval = task if "dialog_container" in task: task["dialog_container"][0] = dlg elif action == 'refresh_from_project': self._refresh_from_project() elif action == "mma_spawn_approval": spawn_dlg = MMASpawnApprovalDialog( str(task.get("ticket_id") or ""), str(task.get("role") or ""), str(task.get("prompt") or ""), str(task.get("context_md") or "") ) self._pending_mma_spawn = task self._mma_spawn_prompt = task.get("prompt", "") self._mma_spawn_context = task.get("context_md", "") self._mma_spawn_open = True self._mma_spawn_edit_mode = False if "dialog_container" in task: task["dialog_container"][0] = spawn_dlg except Exception as e: print(f"Error executing GUI task: {e}") def _process_pending_history_adds(self) -> None: """Synchronizes pending history entries to the active discussion and project state.""" with self._pending_history_adds_lock: items = self._pending_history_adds[:] self._pending_history_adds.clear() if not items: return self._scroll_disc_to_bottom = True for item in items: item.get("role", "unknown") if item.get("role") and item["role"] not in self.disc_roles: self.disc_roles.append(item["role"]) disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) disc_data = discussions.get(self.active_discussion) if disc_data is not None: if item.get("disc_title", self.active_discussion) == self.active_discussion: if self.disc_entries is not disc_data.get("history"): if "history" not in disc_data: disc_data["history"] = [] disc_data["history"].append(project_manager.entry_to_str(item)) disc_data["last_updated"] = project_manager.now_ts() with self._disc_entries_lock: self.disc_entries.append(item) def _handle_approve_script(self) -> None: """Logic for approving a pending script via API hooks.""" print("[DEBUG] _handle_approve_script called") with self._pending_dialog_lock: if self._pending_dialog: print(f"[DEBUG] Approving dialog for: {self._pending_dialog._script[:50]}...") with self._pending_dialog._condition: self._pending_dialog._approved = True self._pending_dialog._done = True self._pending_dialog._condition.notify_all() self._pending_dialog = None else: print("[DEBUG] No pending dialog to approve") def _handle_reject_script(self) -> None: """Logic for rejecting a pending script via API hooks.""" print("[DEBUG] _handle_reject_script called") with self._pending_dialog_lock: if self._pending_dialog: print(f"[DEBUG] Rejecting dialog for: {self._pending_dialog._script[:50]}...") with self._pending_dialog._condition: self._pending_dialog._approved = False self._pending_dialog._done = True self._pending_dialog._condition.notify_all() self._pending_dialog = None else: print("[DEBUG] No pending dialog to reject") def _handle_approve_tool(self) -> None: """Logic for approving a pending tool execution via API hooks.""" print("[DEBUG] _handle_approve_tool called") if self._pending_ask_dialog: self._handle_approve_ask() else: print("[DEBUG] No pending tool approval found") def _handle_approve_mma_step(self) -> None: """Logic for approving a pending MMA step execution via API hooks.""" print("[DEBUG] _handle_approve_mma_step called") if self._pending_mma_approval: self._handle_mma_respond(approved=True, payload=self._mma_approval_payload) self._mma_approval_open = False self._pending_mma_approval = None else: print("[DEBUG] No pending MMA step approval found") def _handle_approve_spawn(self) -> None: """Logic for approving a pending sub-agent spawn via API hooks.""" print("[DEBUG] _handle_approve_spawn called") if self._pending_mma_spawn: # Synchronize with the handler logic self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context) # Crucially, close the modal state so UI can continue self._mma_spawn_open = False self._pending_mma_spawn = None else: print("[DEBUG] No pending spawn approval found") def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None: if self._pending_mma_approval: dlg = self._pending_mma_approval.get("dialog_container", [None])[0] if dlg: with dlg._condition: dlg._approved = approved if payload is not None: dlg._payload = payload dlg._done = True dlg._condition.notify_all() self._pending_mma_approval = None if self._pending_mma_spawn: spawn_dlg = self._pending_mma_spawn.get("dialog_container", [None])[0] if spawn_dlg: with spawn_dlg._condition: spawn_dlg._approved = approved spawn_dlg._abort = abort if prompt is not None: spawn_dlg._prompt = prompt if context_md is not None: spawn_dlg._context_md = context_md spawn_dlg._done = True spawn_dlg._condition.notify_all() self._pending_mma_spawn = None def _handle_approve_ask(self) -> None: """Responds with approval for a pending /api/ask request.""" if not self._ask_request_id: return request_id = self._ask_request_id def do_post() -> None: try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": True}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reject_ask(self) -> None: """Responds with rejection for a pending /api/ask request.""" if not self._ask_request_id: return request_id = self._ask_request_id def do_post() -> None: try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": False}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reset_session(self) -> None: """Logic for resetting the AI session.""" ai_client.reset_session() ai_client.clear_comms_log() self._tool_log.clear() self._comms_log.clear() self.disc_entries.clear() # Clear history in project dict too disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if self.active_discussion in discussions: discussions[self.active_discussion]["history"] = [] self.ai_status = "session reset" self.ai_response = "" self.ui_ai_input = "" with self._pending_history_adds_lock: self._pending_history_adds.clear() def _handle_md_only(self) -> None: """Logic for the 'MD Only' action.""" try: md, path, *_ = self._do_generate() self.last_md = md self.last_md_path = path self.ai_status = f"md written: {path.name}" # Refresh token budget metrics with CURRENT md self._refresh_api_metrics({}, md_content=md) except Exception as e: self.ai_status = f"error: {e}" def _handle_generate_send(self) -> None: """Logic for the 'Gen + Send' action.""" try: md, path, file_items, stable_md, disc_text = self._do_generate() self._last_stable_md = stable_md self.last_md = md self.last_md_path = path self.last_file_items = file_items except Exception as e: self.ai_status = f"generate error: {e}" return self.ai_status = "sending..." user_msg = self.ui_ai_input base_dir = self.ui_files_base_dir # Prepare event payload event_payload = events.UserRequestEvent( prompt=user_msg, stable_md=stable_md, file_items=file_items, disc_text=disc_text, base_dir=base_dir ) # Push to async queue asyncio.run_coroutine_threadsafe( self.event_queue.put("user_request", event_payload), self._loop ) def _run_event_loop(self) -> None: """Runs the internal asyncio event loop.""" asyncio.set_event_loop(self._loop) self._loop.create_task(self._process_event_queue()) # Fallback: process queues even if GUI thread is idling/stuck async def queue_fallback() -> None: while True: try: self._process_pending_gui_tasks() self._process_pending_history_adds() except: pass await asyncio.sleep(0.1) self._loop.create_task(queue_fallback()) self._loop.run_forever() def shutdown(self) -> None: """Cleanly shuts down the app's background tasks and saves state.""" if hasattr(self, 'hook_server'): self.hook_server.stop() if hasattr(self, 'perf_monitor'): self.perf_monitor.stop() if self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) if self._loop_thread.is_alive(): self._loop_thread.join(timeout=2.0) # Join other threads if they exist if self.send_thread and self.send_thread.is_alive(): self.send_thread.join(timeout=1.0) if self.models_thread and self.models_thread.is_alive(): self.models_thread.join(timeout=1.0) # Final State persistence try: ai_client.cleanup() # Destroy active API caches to stop billing self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) except: pass async def _process_event_queue(self) -> None: """Listens for and processes events from the AsyncEventQueue.""" while True: event_name, payload = await self.event_queue.get() if event_name == "user_request": # Handle the request in a separate thread to avoid blocking the loop self._loop.run_in_executor(None, self._handle_request_event, payload) elif event_name == "response": # Handle AI response event with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": payload }) elif event_name == "mma_state_update": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "mma_state_update", "payload": payload }) elif event_name == "mma_stream": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "mma_stream_append", "payload": payload }) elif event_name in ("mma_spawn_approval", "mma_step_approval"): # Route approval events to GUI tasks — payload already has the # correct structure for _process_pending_gui_tasks handlers. with self._pending_gui_tasks_lock: self._pending_gui_tasks.append(payload) def _handle_request_event(self, event: events.UserRequestEvent) -> None: """Processes a UserRequestEvent by calling the AI client.""" if self.ui_auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "User", "content": event.prompt, "collapsed": False, "ts": project_manager.now_ts() }) csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit) ai_client.set_agent_tools(self.ui_agent_tools) try: resp = ai_client.send( event.stable_md, event.prompt, event.base_dir, event.file_items, event.disc_text, pre_tool_callback=self._confirm_and_run, qa_callback=ai_client.run_tier4_analysis ) # Emit response event asyncio.run_coroutine_threadsafe( self.event_queue.put("response", {"text": resp, "status": "done"}), self._loop ) except ProviderError as e: asyncio.run_coroutine_threadsafe( self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}), self._loop ) except Exception as e: asyncio.run_coroutine_threadsafe( self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}), self._loop ) def _test_callback_func_write_to_file(self, data: str) -> None: """A dummy function that a custom_callback would execute for testing.""" import os # Ensure the directory exists if running from a different cwd os.makedirs("tests/artifacts", exist_ok=True) with open("tests/artifacts/temp_callback_output.txt", "w") as f: f.write(data) def _recalculate_session_usage(self) -> None: usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0} for entry in ai_client.get_comms_log(): if entry.get("kind") == "response" and "usage" in entry.get("payload", {}): u = entry["payload"]["usage"] for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: if k in usage: usage[k] += u.get(k, 0) or 0 self.session_usage = usage def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None: if "latency" in payload: self.session_usage["last_latency"] = payload["latency"] self._recalculate_session_usage() if md_content is not None: stats = ai_client.get_token_stats(md_content) # Ensure compatibility if keys are named differently if "total_tokens" in stats and "estimated_prompt_tokens" not in stats: stats["estimated_prompt_tokens"] = stats["total_tokens"] self._token_stats = stats cache_stats = payload.get("cache_stats") if cache_stats: count = cache_stats.get("cache_count", 0) size_bytes = cache_stats.get("total_size_bytes", 0) self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)" def cb_load_prior_log(self) -> None: root = hide_tk_root() path = filedialog.askopenfilename( title="Load Session Log", initialdir="logs", filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")] ) root.destroy() if not path: return entries = [] try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: try: entries.append(json.loads(line)) except json.JSONDecodeError: continue except Exception as e: self.ai_status = f"log load error: {e}" return self.prior_session_entries = entries self.is_viewing_prior_session = True self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)" def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str | None: print(f"[DEBUG] _confirm_and_run triggered for script length: {len(script)}") if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): print(f"[DEBUG] test_hooks_enabled is True and ui_manual_approve is False; AUTO-APPROVING script execution in {base_dir}") self.ai_status = "running powershell..." output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback) self._append_tool_log(script, output) self.ai_status = "powershell done, awaiting AI..." return output dialog = ConfirmDialog(script, base_dir) is_headless = "--headless" in sys.argv if is_headless: with self._pending_dialog_lock: self._pending_actions[dialog._uid] = dialog print(f"[PENDING_ACTION] Created action {dialog._uid}") else: with self._pending_dialog_lock: self._pending_dialog = dialog # Notify API hook subscribers if self.test_hooks_enabled and hasattr(self, '_api_event_queue'): print("[DEBUG] Pushing script_confirmation_required event to queue") with self._api_event_queue_lock: self._api_event_queue.append({ "type": "script_confirmation_required", "action_id": dialog._uid, "script": str(script), "base_dir": str(base_dir), "ts": time.time() }) approved, final_script = dialog.wait() if is_headless: with self._pending_dialog_lock: if dialog._uid in self._pending_actions: del self._pending_actions[dialog._uid] print(f"[DEBUG] _confirm_and_run result: approved={approved}") if not approved: self._append_tool_log(final_script, "REJECTED by user") return None self.ai_status = "running powershell..." print(f"[DEBUG] Running powershell in {base_dir}") output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback) self._append_tool_log(final_script, output) self.ai_status = "powershell done, awaiting AI..." return output def resolve_pending_action(self, action_id: str, approved: bool) -> bool: """Resolves a pending PowerShell script confirmation by its ID. Args: action_id: The unique identifier for the pending action. approved: True if the script should be executed, False otherwise. Returns: bool: True if the action was found and resolved, False otherwise. """ with self._pending_dialog_lock: if action_id in self._pending_actions: dialog = self._pending_actions[action_id] with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True elif self._pending_dialog and self._pending_dialog._uid == action_id: dialog = self._pending_dialog with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True return False def _append_tool_log(self, script: str, result: str, source_tier: str | None = None) -> None: self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) self.ui_last_script_text = script self.ui_last_script_output = result self._trigger_script_blink = True self.show_script_output = True if self.ui_auto_scroll_tool_calls: self._scroll_tool_calls_to_bottom = True def _flush_to_project(self) -> None: proj = self.project proj.setdefault("output", {})["output_dir"] = self.ui_output_dir proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir proj["files"]["paths"] = self.files proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir proj["screenshots"]["paths"] = self.screenshots proj.setdefault("project", {}) proj["project"]["git_dir"] = self.ui_project_git_dir proj["project"]["system_prompt"] = self.ui_project_system_prompt proj["project"]["main_context"] = self.ui_project_main_context proj["project"]["word_wrap"] = self.ui_word_wrap proj["project"]["summary_only"] = self.ui_summary_only proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path proj.setdefault("agent", {}).setdefault("tools", {}) for t_name in AGENT_TOOL_NAMES: proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True) self._flush_disc_entries_to_project() disc_sec = proj.setdefault("discussion", {}) disc_sec["roles"] = self.disc_roles disc_sec["active"] = self.active_discussion disc_sec["auto_add"] = self.ui_auto_add_history # Save MMA State mma_sec = proj.setdefault("mma", {}) mma_sec["epic"] = self.ui_epic_input if self.active_track: # We only persist the basic metadata if full serialization is too complex # For now, let's try full serialization via asdict from dataclasses import asdict mma_sec["active_track"] = asdict(self.active_track) else: mma_sec["active_track"] = None def _flush_to_config(self) -> None: self.config["ai"] = { "provider": self.current_provider, "model": self.current_model, "temperature": self.temperature, "max_tokens": self.max_tokens, "history_trunc_limit": self.history_trunc_limit, } self.config["ai"]["system_prompt"] = self.ui_global_system_prompt self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path} self.config["gui"] = {"show_windows": self.show_windows} theme.save_to_config(self.config) def _do_generate(self) -> tuple[str, Path, list[dict[str, Any]], str, str]: """Returns (full_md, output_path, file_items, stable_md, discussion_text).""" self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) track_id = self.active_track.id if self.active_track else None flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id) full_md, path, file_items = aggregate.run(flat) # Build stable markdown (no history) for Gemini caching screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", ".")) screenshots = flat.get("screenshots", {}).get("paths", []) summary_only = flat.get("project", {}).get("summary_only", False) stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only) # Build discussion history text separately history = flat.get("discussion", {}).get("history", []) discussion_text = aggregate.build_discussion_text(history) return full_md, path, file_items, stable_md, discussion_text def _fetch_models(self, provider: str) -> None: self.ai_status = "fetching models..." def do_fetch() -> None: try: models = ai_client.list_models(provider) self.available_models = models if self.current_model not in models and models: self.current_model = models[0] ai_client.set_provider(self.current_provider, self.current_model) self.ai_status = f"models loaded: {len(models)}" except Exception as e: self.ai_status = f"model fetch error: {e}" self.models_thread = threading.Thread(target=do_fetch, daemon=True) self.models_thread.start() # ---------------------------------------------------------------- helpers def _render_text_viewer(self, label: str, content: str) -> None: if imgui.button("[+]##" + str(id(content))): self.show_text_viewer = True self.text_viewer_title = label self.text_viewer_content = content def _render_heavy_text(self, label: str, content: str) -> None: imgui.text_colored(C_LBL, f"{label}:") imgui.same_line() if imgui.button("[+]##" + label): self.show_text_viewer = True self.text_viewer_title = label self.text_viewer_content = content if len(content) > COMMS_CLAMP_CHARS: if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(content) imgui.pop_text_wrap_pos() else: imgui.begin_child(f"heavy_text_child_{label}", imgui.ImVec2(0, 80), True) imgui.input_text_multiline(f"##{label}_input", content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end_child() else: if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(content if content else "(empty)") imgui.pop_text_wrap_pos() else: imgui.text(content if content else "(empty)") # ---------------------------------------------------------------- gui def _show_menus(self) -> None: if imgui.begin_menu("manual slop"): if imgui.menu_item("Quit", "Ctrl+Q", False)[0]: self.runner_params.app_shall_exit = True imgui.end_menu() if imgui.begin_menu("Windows"): for w in self.show_windows.keys(): _, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w]) imgui.end_menu() if imgui.begin_menu("Project"): if imgui.menu_item("Save All", "", False)[0]: self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self.ai_status = "config saved" if imgui.menu_item("Reset Session", "", False)[0]: ai_client.reset_session() ai_client.clear_comms_log() self._tool_log.clear() self._comms_log.clear() self.ai_status = "session reset" self.ai_response = "" if imgui.menu_item("Generate MD Only", "", False)[0]: try: md, path, *_ = self._do_generate() self.last_md = md self.last_md_path = path self.ai_status = f"md written: {path.name}" except Exception as e: self.ai_status = f"error: {e}" imgui.end_menu() def _gui_func(self) -> None: try: self.perf_monitor.start_frame() # Process GUI task queue self._process_pending_gui_tasks() self._render_track_proposal_modal() # Auto-save (every 60s) now = time.time() if now - self._last_autosave >= self._autosave_interval: self._last_autosave = now try: self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) except Exception: pass # silent — don't disrupt the GUI loop # Sync pending comms with self._pending_comms_lock: if self._pending_comms and self.ui_auto_scroll_comms: self._scroll_comms_to_bottom = True for c in self._pending_comms: self._comms_log.append(c) self._pending_comms.clear() with self._pending_tool_calls_lock: if self._pending_tool_calls and self.ui_auto_scroll_tool_calls: self._scroll_tool_calls_to_bottom = True for tc in self._pending_tool_calls: self._tool_log.append(tc) self._pending_tool_calls.clear() if self.show_windows.get("Context Hub", False): exp, opened = imgui.begin("Context Hub", self.show_windows["Context Hub"]) self.show_windows["Context Hub"] = bool(opened) if exp: self._render_projects_panel() imgui.end() if self.show_windows.get("Files & Media", False): exp, opened = imgui.begin("Files & Media", self.show_windows["Files & Media"]) self.show_windows["Files & Media"] = bool(opened) if exp: if imgui.collapsing_header("Files"): self._render_files_panel() if imgui.collapsing_header("Screenshots"): self._render_screenshots_panel() imgui.end() if self.show_windows.get("AI Settings", False): exp, opened = imgui.begin("AI Settings", self.show_windows["AI Settings"]) self.show_windows["AI Settings"] = bool(opened) if exp: if imgui.collapsing_header("Provider & Model"): self._render_provider_panel() if imgui.collapsing_header("System Prompts"): self._render_system_prompts_panel() if imgui.collapsing_header("Token Budget"): self._render_token_budget_panel() imgui.end() if self.show_windows.get("MMA Dashboard", False): exp, opened = imgui.begin("MMA Dashboard", self.show_windows["MMA Dashboard"]) self.show_windows["MMA Dashboard"] = bool(opened) if exp: self._render_mma_dashboard() imgui.end() if self.show_windows.get("Tier 1: Strategy", False): exp, opened = imgui.begin("Tier 1: Strategy", self.show_windows["Tier 1: Strategy"]) self.show_windows["Tier 1: Strategy"] = bool(opened) if exp: self._render_tier_stream_panel("Tier 1", "Tier 1") imgui.end() if self.show_windows.get("Tier 2: Tech Lead", False): exp, opened = imgui.begin("Tier 2: Tech Lead", self.show_windows["Tier 2: Tech Lead"]) self.show_windows["Tier 2: Tech Lead"] = bool(opened) if exp: self._render_tier_stream_panel("Tier 2", "Tier 2 (Tech Lead)") imgui.end() if self.show_windows.get("Tier 3: Workers", False): exp, opened = imgui.begin("Tier 3: Workers", self.show_windows["Tier 3: Workers"]) self.show_windows["Tier 3: Workers"] = bool(opened) if exp: self._render_tier_stream_panel("Tier 3", None) imgui.end() if self.show_windows.get("Tier 4: QA", False): exp, opened = imgui.begin("Tier 4: QA", self.show_windows["Tier 4: QA"]) self.show_windows["Tier 4: QA"] = bool(opened) if exp: self._render_tier_stream_panel("Tier 4", "Tier 4 (QA)") imgui.end() if self.show_windows.get("Theme", False): self._render_theme_panel() if self.show_windows.get("Discussion Hub", False): exp, opened = imgui.begin("Discussion Hub", self.show_windows["Discussion Hub"]) self.show_windows["Discussion Hub"] = bool(opened) if exp: # Top part for the history imgui.begin_child("HistoryChild", size=(0, -200)) self._render_discussion_panel() imgui.end_child() # Bottom part with tabs for message and response if imgui.begin_tab_bar("MessageResponseTabs"): if imgui.begin_tab_item("Message")[0]: self._render_message_panel() imgui.end_tab_item() if imgui.begin_tab_item("Response")[0]: self._render_response_panel() imgui.end_tab_item() imgui.end_tab_bar() imgui.end() if self.show_windows.get("Operations Hub", False): exp, opened = imgui.begin("Operations Hub", self.show_windows["Operations Hub"]) self.show_windows["Operations Hub"] = bool(opened) if exp: imgui.text("Focus Agent:") imgui.same_line() focus_label = self.ui_focus_agent or "All" if imgui.begin_combo("##focus_agent", focus_label, imgui.ComboFlags_.width_fit_preview): if imgui.selectable("All", self.ui_focus_agent is None)[0]: self.ui_focus_agent = None for tier in ["Tier 2", "Tier 3", "Tier 4"]: if imgui.selectable(tier, self.ui_focus_agent == tier)[0]: self.ui_focus_agent = tier imgui.end_combo() imgui.same_line() if self.ui_focus_agent: if imgui.button("x##clear_focus"): self.ui_focus_agent = None imgui.separator() if imgui.begin_tab_bar("OperationsTabs"): if imgui.begin_tab_item("Tool Calls")[0]: self._render_tool_calls_panel() imgui.end_tab_item() if imgui.begin_tab_item("Comms History")[0]: self._render_comms_history_panel() imgui.end_tab_item() imgui.end_tab_bar() imgui.end() if self.show_windows.get("Log Management", False): self._render_log_management() if self.show_windows["Diagnostics"]: exp, opened = imgui.begin("Diagnostics", self.show_windows["Diagnostics"]) self.show_windows["Diagnostics"] = bool(opened) if exp: now = time.time() if now - self._perf_last_update >= 0.5: self._perf_last_update = now metrics = self.perf_monitor.get_metrics() self.perf_history["frame_time"].pop(0) self.perf_history["frame_time"].append(metrics.get("last_frame_time_ms", 0.0)) self.perf_history["fps"].pop(0) self.perf_history["fps"].append(metrics.get("fps", 0.0)) self.perf_history["cpu"].pop(0) self.perf_history["cpu"].append(metrics.get("cpu_percent", 0.0)) self.perf_history["input_lag"].pop(0) self.perf_history["input_lag"].append(metrics.get("input_lag_ms", 0.0)) metrics = self.perf_monitor.get_metrics() imgui.text("Performance Telemetry") imgui.separator() if imgui.begin_table("perf_table", 2, imgui.TableFlags_.borders_inner_h): imgui.table_setup_column("Metric") imgui.table_setup_column("Value") imgui.table_headers_row() imgui.table_next_row() imgui.table_next_column() imgui.text("FPS") imgui.table_next_column() imgui.text(f"{metrics.get('fps', 0.0):.1f}") imgui.table_next_row() imgui.table_next_column() imgui.text("Frame Time (ms)") imgui.table_next_column() imgui.text(f"{metrics.get('last_frame_time_ms', 0.0):.2f}") imgui.table_next_row() imgui.table_next_column() imgui.text("CPU %") imgui.table_next_column() imgui.text(f"{metrics.get('cpu_percent', 0.0):.1f}") imgui.table_next_row() imgui.table_next_column() imgui.text("Input Lag (ms)") imgui.table_next_column() imgui.text(f"{metrics.get('input_lag_ms', 0.0):.1f}") imgui.end_table() imgui.separator() imgui.text("Frame Time (ms)") imgui.plot_lines("##ft_plot", np.array(self.perf_history["frame_time"], dtype=np.float32), overlay_text="frame_time", graph_size=imgui.ImVec2(-1, 60)) imgui.text("CPU %") imgui.plot_lines("##cpu_plot", np.array(self.perf_history["cpu"], dtype=np.float32), overlay_text="cpu", graph_size=imgui.ImVec2(-1, 60)) imgui.end() self.perf_monitor.end_frame() # ---- Modals / Popups with self._pending_dialog_lock: dlg = self._pending_dialog if dlg: if not self._pending_dialog_open: imgui.open_popup("Approve PowerShell Command") self._pending_dialog_open = True else: self._pending_dialog_open = False if imgui.begin_popup_modal("Approve PowerShell Command", None, imgui.WindowFlags_.always_auto_resize)[0]: if not dlg: imgui.close_current_popup() else: imgui.text("The AI wants to run the following PowerShell script:") imgui.text_colored(vec4(200, 200, 100), f"base_dir: {dlg._base_dir}") imgui.separator() # Checkbox to toggle full preview inside modal _, self.show_text_viewer = imgui.checkbox("Show Full Preview", self.show_text_viewer) if self.show_text_viewer: imgui.begin_child("preview_child", imgui.ImVec2(600, 300), True) imgui.text_unformatted(dlg._script) imgui.end_child() else: ch, dlg._script = imgui.input_text_multiline("##confirm_script", dlg._script, imgui.ImVec2(-1, 200)) imgui.separator() if imgui.button("Approve & Run", imgui.ImVec2(120, 0)): with dlg._condition: dlg._approved = True dlg._done = True dlg._condition.notify_all() with self._pending_dialog_lock: self._pending_dialog = None imgui.close_current_popup() imgui.same_line() if imgui.button("Reject", imgui.ImVec2(120, 0)): with dlg._condition: dlg._approved = False dlg._done = True dlg._condition.notify_all() with self._pending_dialog_lock: self._pending_dialog = None imgui.close_current_popup() imgui.end_popup() if self._pending_ask_dialog: if not self._ask_dialog_open: imgui.open_popup("Approve Tool Execution") self._ask_dialog_open = True else: self._ask_dialog_open = False if imgui.begin_popup_modal("Approve Tool Execution", None, imgui.WindowFlags_.always_auto_resize)[0]: if not self._pending_ask_dialog or self._ask_tool_data is None: imgui.close_current_popup() else: tool_name = self._ask_tool_data.get("tool", "unknown") tool_args = self._ask_tool_data.get("args", {}) imgui.text("The AI wants to execute a tool:") imgui.text_colored(vec4(200, 200, 100), f"Tool: {tool_name}") imgui.separator() imgui.text("Arguments:") imgui.begin_child("ask_args_child", imgui.ImVec2(400, 200), True) imgui.text_unformatted(json.dumps(tool_args, indent=2)) imgui.end_child() imgui.separator() if imgui.button("Approve", imgui.ImVec2(120, 0)): self._handle_approve_ask() imgui.close_current_popup() imgui.same_line() if imgui.button("Deny", imgui.ImVec2(120, 0)): self._handle_reject_ask() imgui.close_current_popup() imgui.end_popup() # MMA Step Approval Modal if self._pending_mma_approval: if not self._mma_approval_open: imgui.open_popup("MMA Step Approval") self._mma_approval_open = True self._mma_approval_edit_mode = False self._mma_approval_payload = self._pending_mma_approval.get("payload", "") else: self._mma_approval_open = False if imgui.begin_popup_modal("MMA Step Approval", None, imgui.WindowFlags_.always_auto_resize)[0]: if not self._pending_mma_approval: imgui.close_current_popup() else: ticket_id = self._pending_mma_approval.get("ticket_id", "??") imgui.text(f"Ticket {ticket_id} is waiting for tool execution approval.") imgui.separator() if self._mma_approval_edit_mode: imgui.text("Edit Raw Payload (Manual Memory Mutation):") _, self._mma_approval_payload = imgui.input_text_multiline("##mma_payload", self._mma_approval_payload, imgui.ImVec2(600, 400)) else: imgui.text("Proposed Tool Call:") imgui.begin_child("mma_preview", imgui.ImVec2(600, 300), True) imgui.text_unformatted(str(self._pending_mma_approval.get("payload", ""))) imgui.end_child() imgui.separator() if imgui.button("Approve", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=True, payload=self._mma_approval_payload) imgui.close_current_popup() imgui.same_line() if imgui.button("Edit Payload" if not self._mma_approval_edit_mode else "Show Original", imgui.ImVec2(120, 0)): self._mma_approval_edit_mode = not self._mma_approval_edit_mode imgui.same_line() if imgui.button("Abort Ticket", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=False) imgui.close_current_popup() imgui.end_popup() # MMA Spawn Approval Modal if self._pending_mma_spawn: if not self._mma_spawn_open: imgui.open_popup("MMA Spawn Approval") self._mma_spawn_open = True self._mma_spawn_edit_mode = False self._mma_spawn_prompt = self._pending_mma_spawn.get("prompt", "") self._mma_spawn_context = self._pending_mma_spawn.get("context_md", "") else: self._mma_spawn_open = False if imgui.begin_popup_modal("MMA Spawn Approval", None, imgui.WindowFlags_.always_auto_resize)[0]: if not self._pending_mma_spawn: imgui.close_current_popup() else: role = self._pending_mma_spawn.get("role", "??") ticket_id = self._pending_mma_spawn.get("ticket_id", "??") imgui.text(f"Spawning {role} for Ticket {ticket_id}") imgui.separator() if self._mma_spawn_edit_mode: imgui.text("Edit Prompt:") _, self._mma_spawn_prompt = imgui.input_text_multiline("##spawn_prompt", self._mma_spawn_prompt, imgui.ImVec2(800, 200)) imgui.text("Edit Context MD:") _, self._mma_spawn_context = imgui.input_text_multiline("##spawn_context", self._mma_spawn_context, imgui.ImVec2(800, 300)) else: imgui.text("Proposed Prompt:") imgui.begin_child("spawn_prompt_preview", imgui.ImVec2(800, 150), True) imgui.text_unformatted(self._mma_spawn_prompt) imgui.end_child() imgui.text("Proposed Context MD:") imgui.begin_child("spawn_context_preview", imgui.ImVec2(800, 250), True) imgui.text_unformatted(self._mma_spawn_context) imgui.end_child() imgui.separator() if imgui.button("Approve", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context) imgui.close_current_popup() imgui.same_line() if imgui.button("Edit Mode" if not self._mma_spawn_edit_mode else "Preview Mode", imgui.ImVec2(120, 0)): self._mma_spawn_edit_mode = not self._mma_spawn_edit_mode imgui.same_line() if imgui.button("Abort", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=False, abort=True) imgui.close_current_popup() imgui.end_popup() if self.show_script_output: if self._trigger_script_blink: self._trigger_script_blink = False self._is_script_blinking = True self._script_blink_start_time = time.time() try: imgui.set_window_focus("Last Script Output") # type: ignore[call-arg] except Exception: pass if self._is_script_blinking: elapsed = time.time() - self._script_blink_start_time if elapsed > 1.5: self._is_script_blinking = False else: val = math.sin(elapsed * 8 * math.pi) alpha = 60/255 if val > 0 else 0 imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 100, 255, alpha)) imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 100, 255, alpha)) imgui.set_next_window_size(imgui.ImVec2(800, 600), imgui.Cond_.first_use_ever) expanded, opened = imgui.begin("Last Script Output", self.show_script_output) self.show_script_output = bool(opened) if expanded: imgui.text("Script:") imgui.same_line() self._render_text_viewer("Last Script", self.ui_last_script_text) if self.ui_word_wrap: imgui.begin_child("lso_s_wrap", imgui.ImVec2(-1, 200), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.ui_last_script_text) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##lso_s", self.ui_last_script_text, imgui.ImVec2(-1, 200), imgui.InputTextFlags_.read_only) imgui.separator() imgui.text("Output:") imgui.same_line() self._render_text_viewer("Last Output", self.ui_last_script_output) if self.ui_word_wrap: imgui.begin_child("lso_o_wrap", imgui.ImVec2(-1, -1), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.ui_last_script_output) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##lso_o", self.ui_last_script_output, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) if self._is_script_blinking: imgui.pop_style_color(2) imgui.end() if self.show_text_viewer: imgui.set_next_window_size(imgui.ImVec2(900, 700), imgui.Cond_.first_use_ever) expanded, opened = imgui.begin(f"Text Viewer - {self.text_viewer_title}", self.show_text_viewer) self.show_text_viewer = bool(opened) if expanded: if self.ui_word_wrap: imgui.begin_child("tv_wrap", imgui.ImVec2(-1, -1), False) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.text_viewer_content) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##tv_c", self.text_viewer_content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end() except Exception as e: print(f"ERROR in _gui_func: {e}") import traceback traceback.print_exc() def _render_projects_panel(self) -> None: proj_name = self.project.get("project", {}).get("name", Path(self.active_project_path).stem) imgui.text_colored(C_IN, f"Active: {proj_name}") imgui.separator() imgui.text("Git Directory") ch, self.ui_project_git_dir = imgui.input_text("##git_dir", self.ui_project_git_dir) imgui.same_line() if imgui.button("Browse##git"): r = hide_tk_root() d = filedialog.askdirectory(title="Select Git Directory") r.destroy() if d: self.ui_project_git_dir = d imgui.separator() imgui.text("Main Context File") ch, self.ui_project_main_context = imgui.input_text("##main_ctx", self.ui_project_main_context) imgui.same_line() if imgui.button("Browse##ctx"): r = hide_tk_root() p = filedialog.askopenfilename(title="Select Main Context File") r.destroy() if p: self.ui_project_main_context = p imgui.separator() imgui.text("Output Dir") ch, self.ui_output_dir = imgui.input_text("##out_dir", self.ui_output_dir) imgui.same_line() if imgui.button("Browse##out"): r = hide_tk_root() d = filedialog.askdirectory(title="Select Output Dir") r.destroy() if d: self.ui_output_dir = d imgui.separator() imgui.text("Project Files") imgui.begin_child("proj_files", imgui.ImVec2(0, 150), True) for i, pp in enumerate(self.project_paths): is_active = (pp == self.active_project_path) if imgui.button(f"x##p{i}"): removed = self.project_paths.pop(i) if removed == self.active_project_path and self.project_paths: self._switch_project(self.project_paths[0]) break imgui.same_line() marker = " *" if is_active else "" if is_active: imgui.push_style_color(imgui.Col_.text, C_IN) if imgui.button(f"{Path(pp).stem}{marker}##ps{i}"): self._switch_project(pp) if is_active: imgui.pop_style_color() imgui.same_line() imgui.text_colored(C_LBL, pp) imgui.end_child() if imgui.button("Add Project"): r = hide_tk_root() p = filedialog.askopenfilename( title="Select Project .toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")], ) r.destroy() if p and p not in self.project_paths: self.project_paths.append(p) imgui.same_line() if imgui.button("New Project"): r = hide_tk_root() p = filedialog.asksaveasfilename(title="Create New Project .toml", defaultextension=".toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")]) r.destroy() if p: name = Path(p).stem proj = project_manager.default_project(name) project_manager.save_project(proj, p) if p not in self.project_paths: self.project_paths.append(p) self._switch_project(p) imgui.same_line() if imgui.button("Save All"): self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self.ai_status = "config saved" ch, self.ui_word_wrap = imgui.checkbox("Word-Wrap (Read-only panels)", self.ui_word_wrap) ch, self.ui_summary_only = imgui.checkbox("Summary Only (send file structure, not full content)", self.ui_summary_only) ch, self.ui_auto_scroll_comms = imgui.checkbox("Auto-scroll Comms History", self.ui_auto_scroll_comms) ch, self.ui_auto_scroll_tool_calls = imgui.checkbox("Auto-scroll Tool History", self.ui_auto_scroll_tool_calls) if imgui.collapsing_header("Agent Tools"): for t_name in AGENT_TOOL_NAMES: val = self.ui_agent_tools.get(t_name, True) ch, val = imgui.checkbox(f"Enable {t_name}", val) if ch: self.ui_agent_tools[t_name] = val imgui.separator() imgui.text_colored(C_LBL, 'MMA Orchestration') _, self.ui_epic_input = imgui.input_text_multiline('##epic_input', self.ui_epic_input, imgui.ImVec2(-1, 80)) if imgui.button('Plan Epic (Tier 1)', imgui.ImVec2(-1, 0)): self._cb_plan_epic() def _cb_plan_epic(self) -> None: def _bg_task() -> None: try: self.ai_status = "Planning Epic (Tier 1)..." history = orchestrator_pm.get_track_history_summary() proj = project_manager.load_project(self.active_project_path) flat = project_manager.flat_config(proj) file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", [])) _t1_baseline = len(ai_client.get_comms_log()) tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history) _t1_new = ai_client.get_comms_log()[_t1_baseline:] _t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"] _t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp) _t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp) def _push_t1_usage(i: int, o: int) -> None: self.mma_tier_usage["Tier 1"]["input"] += i self.mma_tier_usage["Tier 1"]["output"] += o with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "custom_callback", "callback": _push_t1_usage, "args": [_t1_in, _t1_out] }) self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": { "text": json.dumps(tracks, indent=2), "stream_id": "Tier 1", "status": "Epic tracks generated." } }) self._pending_gui_tasks.append({ "action": "show_track_proposal", "payload": tracks }) except Exception as e: self.ai_status = f"Epic plan error: {e}" print(f"ERROR in _cb_plan_epic background task: {e}") threading.Thread(target=_bg_task, daemon=True).start() def _cb_accept_tracks(self) -> None: self._show_track_proposal_modal = False def _bg_task() -> None: # Generate skeletons once self.ai_status = "Phase 2: Generating skeletons for all tracks..." parser = ASTParser(language="python") generated_skeletons = "" try: for i, file_path in enumerate(self.files): try: self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..." abs_path = Path(self.ui_files_base_dir) / file_path if abs_path.exists() and abs_path.suffix == ".py": with open(abs_path, "r", encoding="utf-8") as f: code = f.read() generated_skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n" except Exception as e: print(f"Error parsing skeleton for {file_path}: {e}") except Exception as e: self.ai_status = f"Error generating skeletons: {e}" print(f"Error generating skeletons: {e}") return # Exit if skeleton generation fails # Now loop through tracks and call _start_track_logic with generated skeletons total_tracks = len(self.proposed_tracks) for i, track_data in enumerate(self.proposed_tracks): title = track_data.get("title") or track_data.get("goal", "Untitled Track") self.ai_status = f"Processing track {i+1} of {total_tracks}: '{title}'..." self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started self.ai_status = f"All {total_tracks} tracks accepted and execution started." threading.Thread(target=_bg_task, daemon=True).start() def _cb_start_track(self, user_data: Any = None) -> None: if isinstance(user_data, str): # If track_id is provided directly track_id = user_data # Ensure it's loaded as active if not self.active_track or self.active_track.id != track_id: self._cb_load_track(track_id) if self.active_track: # Use the active track object directly to start execution self.mma_status = "running" engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode) flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id) full_md, _, _ = aggregate.run(flat) asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop) self.ai_status = f"Track '{self.active_track.description}' started." return idx = 0 if isinstance(user_data, int): idx = user_data elif isinstance(user_data, dict): idx = user_data.get("index", 0) if 0 <= idx < len(self.proposed_tracks): track_data = self.proposed_tracks[idx] title = track_data.get("title") or track_data.get("goal", "Untitled Track") threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start() self.ai_status = f"Track '{title}' started." def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None: try: goal = track_data.get("goal", "") title = track_data.get("title") or track_data.get("goal", "Untitled Track") self.ai_status = f"Phase 2: Generating tickets for {title}..." skeletons = "" # Initialize skeletons variable if skeletons_str is None: # Only generate if not provided # 1. Get skeletons for context parser = ASTParser(language="python") for i, file_path in enumerate(self.files): try: self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..." abs_path = Path(self.ui_files_base_dir) / file_path if abs_path.exists() and abs_path.suffix == ".py": with open(abs_path, "r", encoding="utf-8") as f: code = f.read() skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n" except Exception as e: print(f"Error parsing skeleton for {file_path}: {e}") else: skeletons = skeletons_str # Use provided skeletons self.ai_status = "Phase 2: Calling Tech Lead..." _t2_baseline = len(ai_client.get_comms_log()) raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons) _t2_new = ai_client.get_comms_log()[_t2_baseline:] _t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"] _t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp) _t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp) self.mma_tier_usage["Tier 2"]["input"] += _t2_in self.mma_tier_usage["Tier 2"]["output"] += _t2_out if not raw_tickets: self.ai_status = f"Error: No tickets generated for track: {title}" print(f"Warning: No tickets generated for track: {title}") return self.ai_status = "Phase 2: Sorting tickets..." try: sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets) except ValueError as e: print(f"Dependency error in track '{title}': {e}") sorted_tickets_data = raw_tickets # 3. Create Track and Ticket objects from datetime import datetime now = datetime.now() tickets = [] for t_data in sorted_tickets_data: ticket = Ticket( id=t_data["id"], description=t_data.get("description") or t_data.get("goal", "No description"), status=t_data.get("status", "todo"), assigned_to=t_data.get("assigned_to", "unassigned"), depends_on=t_data.get("depends_on", []), step_mode=t_data.get("step_mode", False) ) tickets.append(ticket) track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}" track = Track(id=track_id, description=title, tickets=tickets) # Initialize track state in the filesystem from models import TrackState, Metadata from datetime import datetime now = datetime.now() meta = Metadata(id=track_id, name=title, status="todo", created_at=now, updated_at=now) state = TrackState(metadata=meta, discussion=[], tasks=tickets) project_manager.save_track_state(track_id, state, self.ui_files_base_dir) # 4. Initialize ConductorEngine and run loop engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode) # Use current full markdown context for the track execution track_id_param = track.id flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param) full_md, _, _ = aggregate.run(flat) # Schedule the coroutine on the internal event loop asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop) except Exception as e: self.ai_status = f"Track start error: {e}" print(f"ERROR in _start_track_logic: {e}") def _render_track_proposal_modal(self) -> None: if self._show_track_proposal_modal: imgui.open_popup("Track Proposal") if imgui.begin_popup_modal("Track Proposal", True, imgui.WindowFlags_.always_auto_resize)[0]: if not self._show_track_proposal_modal: imgui.close_current_popup() imgui.end_popup() return imgui.text_colored(C_IN, "Proposed Implementation Tracks") imgui.separator() if not self.proposed_tracks: imgui.text("No tracks generated.") else: for idx, track in enumerate(self.proposed_tracks): # Title Edit changed_t, new_t = imgui.input_text(f"Title##{idx}", track.get('title', '')) if changed_t: track['title'] = new_t # Goal Edit changed_g, new_g = imgui.input_text_multiline(f"Goal##{idx}", track.get('goal', ''), imgui.ImVec2(-1, 60)) if changed_g: track['goal'] = new_g # Buttons if imgui.button(f"Remove##{idx}"): self.proposed_tracks.pop(idx) break imgui.same_line() if imgui.button(f"Start This Track##{idx}"): self._cb_start_track(idx) imgui.separator() if imgui.button("Accept", imgui.ImVec2(120, 0)): self._cb_accept_tracks() self._show_track_proposal_modal = False imgui.close_current_popup() imgui.same_line() if imgui.button("Cancel", imgui.ImVec2(120, 0)): self._show_track_proposal_modal = False imgui.close_current_popup() imgui.end_popup() def _render_log_management(self) -> None: exp, opened = imgui.begin("Log Management", self.show_windows["Log Management"]) self.show_windows["Log Management"] = bool(opened) if not exp: imgui.end() return registry = LogRegistry("logs/log_registry.toml") sessions = registry.data if imgui.begin_table("sessions_table", 7, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable): imgui.table_setup_column("Session ID") imgui.table_setup_column("Start Time") imgui.table_setup_column("Star") imgui.table_setup_column("Reason") imgui.table_setup_column("Size (KB)") imgui.table_setup_column("Msgs") imgui.table_setup_column("Actions") imgui.table_headers_row() for session_id, s_data in sessions.items(): imgui.table_next_row() imgui.table_next_column() imgui.text(session_id) imgui.table_next_column() imgui.text(s_data.get("start_time", "")) imgui.table_next_column() whitelisted = s_data.get("whitelisted", False) if whitelisted: imgui.text_colored(vec4(255, 215, 0), "YES") else: imgui.text("NO") metadata = s_data.get("metadata") or {} imgui.table_next_column() imgui.text(metadata.get("reason", "")) imgui.table_next_column() imgui.text(str(metadata.get("size_kb", ""))) imgui.table_next_column() imgui.text(str(metadata.get("message_count", ""))) imgui.table_next_column() if whitelisted: if imgui.button(f"Unstar##{session_id}"): registry.update_session_metadata( session_id, message_count=int(metadata.get("message_count") or 0), errors=int(metadata.get("errors") or 0), size_kb=int(metadata.get("size_kb") or 0), whitelisted=False, reason=str(metadata.get("reason") or "") ) else: if imgui.button(f"Star##{session_id}"): registry.update_session_metadata( session_id, message_count=int(metadata.get("message_count") or 0), errors=int(metadata.get("errors") or 0), size_kb=int(metadata.get("size_kb") or 0), whitelisted=True, reason="Manually whitelisted" ) imgui.end_table() imgui.end() def _render_files_panel(self) -> None: imgui.text("Base Dir") ch, self.ui_files_base_dir = imgui.input_text("##f_base", self.ui_files_base_dir) imgui.same_line() if imgui.button("Browse##fb"): r = hide_tk_root() d = filedialog.askdirectory() r.destroy() if d: self.ui_files_base_dir = d imgui.separator() imgui.text("Paths") imgui.begin_child("f_paths", imgui.ImVec2(0, -40), True) for i, f in enumerate(self.files): if imgui.button(f"x##f{i}"): self.files.pop(i) break imgui.same_line() imgui.text(f) imgui.end_child() if imgui.button("Add File(s)"): r = hide_tk_root() paths = filedialog.askopenfilenames() r.destroy() for p in paths: if p not in self.files: self.files.append(p) imgui.same_line() if imgui.button("Add Wildcard"): r = hide_tk_root() d = filedialog.askdirectory() r.destroy() if d: self.files.append(str(Path(d) / "**" / "*")) def _render_screenshots_panel(self) -> None: imgui.text("Base Dir") ch, self.ui_shots_base_dir = imgui.input_text("##s_base", self.ui_shots_base_dir) imgui.same_line() if imgui.button("Browse##sb"): r = hide_tk_root() d = filedialog.askdirectory() r.destroy() if d: self.ui_shots_base_dir = d imgui.separator() imgui.text("Paths") imgui.begin_child("s_paths", imgui.ImVec2(0, -40), True) for i, s in enumerate(self.screenshots): if imgui.button(f"x##s{i}"): self.screenshots.pop(i) break imgui.same_line() imgui.text(s) imgui.end_child() if imgui.button("Add Screenshot(s)"): r = hide_tk_root() paths = filedialog.askopenfilenames( title="Select Screenshots", filetypes=[("Images", "*.png *.jpg *.jpeg *.gif *.bmp *.webp"), ("All", "*.*")], ) r.destroy() for p in paths: if p not in self.screenshots: self.screenshots.append(p) def _render_discussion_panel(self) -> None: # THINKING indicator is_thinking = self.ai_status in ["sending..."] if is_thinking: val = math.sin(time.time() * 10 * math.pi) alpha = 1.0 if val > 0 else 0.0 imgui.text_colored(imgui.ImVec4(1.0, 0.39, 0.39, alpha), "THINKING...") imgui.separator() # Prior session viewing mode if self.is_viewing_prior_session: imgui.push_style_color(imgui.Col_.child_bg, vec4(50, 40, 20)) imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION") imgui.same_line() if imgui.button("Exit Prior Session"): self.is_viewing_prior_session = False self.prior_session_entries.clear() imgui.separator() imgui.begin_child("prior_scroll", imgui.ImVec2(0, 0), False) for idx, entry in enumerate(self.prior_session_entries): imgui.push_id(f"prior_{idx}") kind = entry.get("kind", entry.get("type", "")) imgui.text_colored(C_LBL, f"#{idx+1}") imgui.same_line() ts = entry.get("ts", entry.get("timestamp", "")) if ts: imgui.text_colored(vec4(160, 160, 160), str(ts)) imgui.same_line() imgui.text_colored(C_KEY, str(kind)) payload = entry.get("payload", entry) text = payload.get("text", payload.get("message", payload.get("content", ""))) if text: preview = str(text).replace("\\n", " ")[:200] if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(preview) imgui.pop_text_wrap_pos() else: imgui.text(preview) imgui.separator() imgui.pop_id() imgui.end_child() imgui.pop_style_color() return if not self.is_viewing_prior_session and imgui.collapsing_header("Discussions", imgui.TreeNodeFlags_.default_open): names = self._get_discussion_names() if imgui.begin_combo("##disc_sel", self.active_discussion): for name in names: is_selected = (name == self.active_discussion) if imgui.selectable(name, is_selected)[0]: self._switch_discussion(name) if is_selected: imgui.set_item_default_focus() imgui.end_combo() if self.active_track: imgui.same_line() changed, self._track_discussion_active = imgui.checkbox("Track Discussion", self._track_discussion_active) if changed: if self._track_discussion_active: self._flush_disc_entries_to_project() history_strings = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir) with self._disc_entries_lock: self.disc_entries = _parse_history_entries(history_strings, self.disc_roles) self.ai_status = f"track discussion: {self.active_track.id}" else: self._flush_disc_entries_to_project() # Restore project discussion self._switch_discussion(self.active_discussion) disc_sec = self.project.get("discussion", {}) disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) git_commit = disc_data.get("git_commit", "") last_updated = disc_data.get("last_updated", "") imgui.text_colored(C_LBL, "commit:") imgui.same_line() imgui.text_colored(C_IN if git_commit else C_LBL, git_commit[:12] if git_commit else "(none)") imgui.same_line() if imgui.button("Update Commit"): git_dir = self.ui_project_git_dir if git_dir: cmt = project_manager.get_git_commit(git_dir) if cmt: disc_data["git_commit"] = cmt disc_data["last_updated"] = project_manager.now_ts() self.ai_status = f"commit: {cmt[:12]}" imgui.text_colored(C_LBL, "updated:") imgui.same_line() imgui.text_colored(C_SUB, last_updated if last_updated else "(never)") ch, self.ui_disc_new_name_input = imgui.input_text("##new_disc", self.ui_disc_new_name_input) imgui.same_line() if imgui.button("Create"): nm = self.ui_disc_new_name_input.strip() if nm: self._create_discussion(nm); self.ui_disc_new_name_input = "" imgui.same_line() if imgui.button("Rename"): nm = self.ui_disc_new_name_input.strip() if nm: self._rename_discussion(self.active_discussion, nm); self.ui_disc_new_name_input = "" imgui.same_line() if imgui.button("Delete"): self._delete_discussion(self.active_discussion) if not self.is_viewing_prior_session: imgui.separator() if imgui.button("+ Entry"): self.disc_entries.append({"role": self.disc_roles[0] if self.disc_roles else "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()}) imgui.same_line() if imgui.button("-All"): for e in self.disc_entries: e["collapsed"] = True imgui.same_line() if imgui.button("+All"): for e in self.disc_entries: e["collapsed"] = False imgui.same_line() if imgui.button("Clear All"): self.disc_entries.clear() imgui.same_line() if imgui.button("Save"): self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self.ai_status = "discussion saved" ch, self.ui_auto_add_history = imgui.checkbox("Auto-add message & response to history", self.ui_auto_add_history) # Truncation controls imgui.text("Keep Pairs:") imgui.same_line() imgui.set_next_item_width(80) ch, self.ui_disc_truncate_pairs = imgui.input_int("##trunc_pairs", self.ui_disc_truncate_pairs, 1) if self.ui_disc_truncate_pairs < 1: self.ui_disc_truncate_pairs = 1 imgui.same_line() if imgui.button("Truncate"): with self._disc_entries_lock: self.disc_entries = truncate_entries(self.disc_entries, self.ui_disc_truncate_pairs) self.ai_status = f"history truncated to {self.ui_disc_truncate_pairs} pairs" imgui.separator() if imgui.collapsing_header("Roles"): imgui.begin_child("roles_scroll", imgui.ImVec2(0, 100), True) for i, r in enumerate(self.disc_roles): if imgui.button(f"x##r{i}"): self.disc_roles.pop(i) break imgui.same_line() imgui.text(r) imgui.end_child() ch, self.ui_disc_new_role_input = imgui.input_text("##new_role", self.ui_disc_new_role_input) imgui.same_line() if imgui.button("Add"): r = self.ui_disc_new_role_input.strip() if r and r not in self.disc_roles: self.disc_roles.append(r) self.ui_disc_new_role_input = "" imgui.separator() imgui.begin_child("disc_scroll", imgui.ImVec2(0, 0), False) clipper = imgui.ListClipper() clipper.begin(len(self.disc_entries)) while clipper.step(): for i in range(clipper.display_start, clipper.display_end): entry = self.disc_entries[i] imgui.push_id(str(i)) collapsed = entry.get("collapsed", False) read_mode = entry.get("read_mode", False) if imgui.button("+" if collapsed else "-"): entry["collapsed"] = not collapsed imgui.same_line() imgui.set_next_item_width(120) if imgui.begin_combo("##role", entry["role"]): for r in self.disc_roles: if imgui.selectable(r, r == entry["role"])[0]: entry["role"] = r imgui.end_combo() if not collapsed: imgui.same_line() if imgui.button("[Edit]" if read_mode else "[Read]"): entry["read_mode"] = not read_mode ts_str = entry.get("ts", "") if ts_str: imgui.same_line() imgui.text_colored(vec4(120, 120, 100), str(ts_str)) if collapsed: imgui.same_line() if imgui.button("Ins"): self.disc_entries.insert(i, {"role": "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()}) imgui.same_line() self._render_text_viewer(f"Entry #{i+1}", entry["content"]) imgui.same_line() if imgui.button("Del"): self.disc_entries.pop(i) imgui.pop_id() break # Break from inner loop, clipper will re-step imgui.same_line() preview = entry["content"].replace("\\n", " ")[:60] if len(entry["content"]) > 60: preview += "..." imgui.text_colored(vec4(160, 160, 150), preview) if not collapsed: if read_mode: imgui.begin_child("read_content", imgui.ImVec2(0, 150), True) if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(entry["content"]) if self.ui_word_wrap: imgui.pop_text_wrap_pos() imgui.end_child() else: ch, entry["content"] = imgui.input_text_multiline("##content", entry["content"], imgui.ImVec2(-1, 150)) imgui.separator() imgui.pop_id() if self._scroll_disc_to_bottom: imgui.set_scroll_here_y(1.0) self._scroll_disc_to_bottom = False imgui.end_child() def _render_provider_panel(self) -> None: imgui.text("Provider") if imgui.begin_combo("##prov", self.current_provider): for p in PROVIDERS: if imgui.selectable(p, p == self.current_provider)[0]: self.current_provider = p imgui.end_combo() imgui.separator() imgui.text("Model") imgui.same_line() if imgui.button("Fetch Models"): self._fetch_models(self.current_provider) if imgui.begin_list_box("##models", imgui.ImVec2(-1, 120)): for m in self.available_models: if imgui.selectable(m, m == self.current_model)[0]: self.current_model = m imgui.end_list_box() imgui.separator() imgui.text("Parameters") ch, self.temperature = imgui.slider_float("Temperature", self.temperature, 0.0, 2.0, "%.2f") ch, self.max_tokens = imgui.input_int("Max Tokens (Output)", self.max_tokens, 1024) ch, self.history_trunc_limit = imgui.input_int("History Truncation Limit", self.history_trunc_limit, 1024) if self.current_provider == "gemini_cli": imgui.separator() imgui.text("Gemini CLI") sid = "None" if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter: sid = ai_client._gemini_cli_adapter.session_id or "None" imgui.text(f"Session ID: {sid}") if imgui.button("Reset CLI Session"): ai_client.reset_session() imgui.text("Binary Path") ch, self.ui_gemini_cli_path = imgui.input_text("##gcli_path", self.ui_gemini_cli_path) imgui.same_line() if imgui.button("Browse##gcli"): r = hide_tk_root() p = filedialog.askopenfilename(title="Select gemini CLI binary") r.destroy() if p: self.ui_gemini_cli_path = p if ch: if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path imgui.separator() imgui.text("Telemetry") usage = self.session_usage total = usage["input_tokens"] + usage["output_tokens"] if total == 0 and usage.get("total_tokens", 0) > 0: total = usage["total_tokens"] imgui.text_colored(C_RES, f"Tokens: {total:,} (In: {usage['input_tokens']:,} Out: {usage['output_tokens']:,})") if usage.get("last_latency", 0.0) > 0: imgui.text_colored(C_LBL, f" Last Latency: {usage['last_latency']:.2f}s") if usage["cache_read_input_tokens"]: imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}") if self._gemini_cache_text: imgui.text_colored(C_SUB, self._gemini_cache_text) def _render_token_budget_panel(self) -> None: if self._token_stats_dirty: self._token_stats_dirty = False self._refresh_api_metrics({}, md_content=self._last_stable_md or None) stats = self._token_stats if not stats: imgui.text_disabled("Token stats unavailable") return pct = stats.get("utilization_pct", 0.0) current = stats.get("estimated_prompt_tokens", stats.get("total_tokens", 0)) limit = stats.get("max_prompt_tokens", 0) headroom = stats.get("headroom_tokens", max(0, limit - current)) if pct < 50.0: color = imgui.ImVec4(0.2, 0.8, 0.2, 1.0) elif pct < 80.0: color = imgui.ImVec4(1.0, 0.8, 0.0, 1.0) else: color = imgui.ImVec4(1.0, 0.2, 0.2, 1.0) imgui.push_style_color(imgui.Col_.plot_histogram, color) imgui.progress_bar(pct / 100.0, imgui.ImVec2(-1, 0), f"{pct:.1f}%") imgui.pop_style_color() imgui.text_disabled(f"{current:,} / {limit:,} tokens ({headroom:,} remaining)") sys_tok = stats.get("system_tokens", 0) tool_tok = stats.get("tools_tokens", 0) hist_tok = stats.get("history_tokens", 0) total_tok = sys_tok + tool_tok + hist_tok or 1 if imgui.begin_table("token_breakdown", 3, imgui.TableFlags_.borders_inner_h | imgui.TableFlags_.sizing_fixed_fit): imgui.table_setup_column("Component") imgui.table_setup_column("Tokens") imgui.table_setup_column("Pct") imgui.table_headers_row() for lbl, tok in [("System", sys_tok), ("Tools", tool_tok), ("History", hist_tok)]: imgui.table_next_row() imgui.table_set_column_index(0); imgui.text(lbl) imgui.table_set_column_index(1); imgui.text(f"{tok:,}") imgui.table_set_column_index(2); imgui.text(f"{tok / total_tok * 100:.0f}%") imgui.end_table() if stats.get("would_trim"): imgui.text_colored(imgui.ImVec4(1.0, 0.3, 0.0, 1.0), "WARNING: Next call will trim history") trimmable = stats.get("trimmable_turns", 0) if trimmable: imgui.text_disabled(f"Trimmable turns: {trimmable}") msgs = stats.get("messages") if msgs: shown = 0 for msg in msgs: if shown >= 3: break if msg.get("trimmable"): role = msg.get("role", "?") toks = msg.get("tokens", 0) imgui.text_disabled(f" [{role}] ~{toks:,} tokens") shown += 1 imgui.separator() if ai_client._provider == "gemini": if ai_client._gemini_cache is not None: age = time.time() - (ai_client._gemini_cache_created_at or time.time()) ttl = ai_client._GEMINI_CACHE_TTL imgui.text_colored(C_LBL, f"Gemini Cache: ACTIVE | Age: {age:.0f}s / {ttl}s | Renews at: {ttl * 0.9:.0f}s") else: imgui.text_disabled("Gemini Cache: INACTIVE") elif ai_client._provider == "anthropic": with ai_client._anthropic_history_lock: turns = len(ai_client._anthropic_history) cache_reads = 0 for entry in reversed(ai_client.get_comms_log()): if entry.get("kind") == "response": cache_reads = (entry.get("payload") or {}).get("usage", {}).get("cache_read_input_tokens") or 0 break imgui.text_disabled("Anthropic: 4-breakpoint ephemeral caching (auto-managed)") imgui.text_disabled(f" {turns} history turns | Cache reads last call: {cache_reads:,}") def _render_message_panel(self) -> None: # LIVE indicator is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."] if is_live: val = math.sin(time.time() * 10 * math.pi) alpha = 1.0 if val > 0 else 0.0 imgui.text_colored(imgui.ImVec4(0.39, 1.0, 0.39, alpha), "LIVE") imgui.separator() ch, self.ui_ai_input = imgui.input_text_multiline("##ai_in", self.ui_ai_input, imgui.ImVec2(-1, -40)) # Keyboard shortcuts io = imgui.get_io() ctrl_enter = io.key_ctrl and imgui.is_key_pressed(imgui.Key.enter) ctrl_l = io.key_ctrl and imgui.is_key_pressed(imgui.Key.l) if ctrl_l: self.ui_ai_input = "" imgui.separator() send_busy = False with self._send_thread_lock: if self.send_thread and self.send_thread.is_alive(): send_busy = True if (imgui.button("Gen + Send") or ctrl_enter) and not send_busy: self._handle_generate_send() imgui.same_line() if imgui.button("MD Only"): self._handle_md_only() imgui.same_line() if imgui.button("Reset"): self._handle_reset_session() imgui.same_line() if imgui.button("-> History"): if self.ui_ai_input: self.disc_entries.append({"role": "User", "content": self.ui_ai_input, "collapsed": False, "ts": project_manager.now_ts()}) def _render_response_panel(self) -> None: if self._trigger_blink: self._trigger_blink = False self._is_blinking = True self._blink_start_time = time.time() try: imgui.set_window_focus("Response") # type: ignore[call-arg] except: pass is_blinking = False if self._is_blinking: elapsed = time.time() - self._blink_start_time if elapsed > 1.5: self._is_blinking = False else: is_blinking = True val = math.sin(elapsed * 8 * math.pi) alpha = 50/255 if val > 0 else 0 imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 255, 0, alpha)) imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, alpha)) # --- Always Render Content --- if self.ui_word_wrap: imgui.begin_child("resp_wrap", imgui.ImVec2(-1, -40), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.ai_response) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##ai_out", self.ai_response, imgui.ImVec2(-1, -40), imgui.InputTextFlags_.read_only) imgui.separator() if imgui.button("-> History"): if self.ai_response: self.disc_entries.append({"role": "AI", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()}) if is_blinking: imgui.pop_style_color(2) def _cb_ticket_retry(self, ticket_id: str) -> None: for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'todo' break asyncio.run_coroutine_threadsafe( self.event_queue.put("mma_retry", {"ticket_id": ticket_id}), self._loop ) def _cb_ticket_skip(self, ticket_id: str) -> None: for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'skipped' break asyncio.run_coroutine_threadsafe( self.event_queue.put("mma_skip", {"ticket_id": ticket_id}), self._loop ) def _cb_run_conductor_setup(self) -> None: base = Path("conductor") if not base.exists(): self.ui_conductor_setup_summary = "Error: conductor/ directory not found." return files = list(base.glob("**/*")) files = [f for f in files if f.is_file()] summary = [f"Conductor Directory: {base.absolute()}"] summary.append(f"Total Files: {len(files)}") total_lines = 0 for f in files: try: with open(f, "r", encoding="utf-8") as fd: lines = len(fd.readlines()) total_lines += lines summary.append(f"- {f.relative_to(base)}: {lines} lines") except Exception: summary.append(f"- {f.relative_to(base)}: Error reading") summary.append(f"Total Line Count: {total_lines}") tracks_dir = base / "tracks" if tracks_dir.exists(): tracks = [d for d in tracks_dir.iterdir() if d.is_dir()] summary.append(f"Total Tracks Found: {len(tracks)}") else: summary.append("Tracks Directory: Not found") self.ui_conductor_setup_summary = "\n".join(summary) def _cb_create_track(self, name: str, desc: str, track_type: str) -> None: if not name: return from datetime import datetime date_suffix = datetime.now().strftime("%Y%m%d") track_id = f"{name.lower().replace(' ', '_')}_{date_suffix}" track_dir = Path("conductor/tracks") / track_id track_dir.mkdir(parents=True, exist_ok=True) spec_file = track_dir / "spec.md" with open(spec_file, "w", encoding="utf-8") as f: f.write(f"# Specification: {name}\n\nType: {track_type}\n\nDescription: {desc}\n") plan_file = track_dir / "plan.md" with open(plan_file, "w", encoding="utf-8") as f: f.write(f"# Implementation Plan: {name}\n\n- [ ] Task 1: Initialize\n") meta_file = track_dir / "metadata.json" import json with open(meta_file, "w", encoding="utf-8") as f: json.dump({ "id": track_id, "title": name, "description": desc, "type": track_type, "status": "new", "progress": 0.0 }, f, indent=1) # Refresh tracks from disk self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) def _push_mma_state_update(self) -> None: if not self.active_track: return # Sync active_tickets (list of dicts) back to active_track.tickets (list of Ticket objects) self.active_track.tickets = [Ticket.from_dict(t) for t in self.active_tickets] # Save the state to disk from project_manager import save_track_state, load_track_state from models import TrackState, Metadata from datetime import datetime existing = load_track_state(self.active_track.id, self.ui_files_base_dir) meta = Metadata( id=self.active_track.id, name=self.active_track.description, status=self.mma_status, created_at=existing.metadata.created_at if existing else datetime.now(), updated_at=datetime.now() ) state = TrackState( metadata=meta, discussion=existing.discussion if existing else [], tasks=self.active_track.tickets ) save_track_state(self.active_track.id, state, self.ui_files_base_dir) def _render_tool_calls_panel(self) -> None: imgui.text("Tool call history") imgui.same_line() if imgui.button("Clear##tc"): self._tool_log.clear() imgui.separator() imgui.begin_child("scroll_area") clipper = imgui.ListClipper() tool_log_filtered = self._tool_log if not self.ui_focus_agent else [ e for e in self._tool_log if e.get("source_tier") == self.ui_focus_agent ] clipper = imgui.ListClipper() clipper.begin(len(tool_log_filtered)) while clipper.step(): for i_minus_one in range(clipper.display_start, clipper.display_end): i = i_minus_one + 1 entry = tool_log_filtered[i_minus_one] script = entry["script"] result = entry["result"] first_line = script.split('\n')[0] if script else 'Empty Script' imgui.text_colored(C_KEY, f"Call #{i}: {first_line}") # Script Display imgui.text_colored(C_LBL, "Script:") imgui.same_line() if imgui.button(f"[+]##script_{i}"): self.show_text_viewer = True self.text_viewer_title = f"Call Script #{i}" self.text_viewer_content = script if self.ui_word_wrap: imgui.begin_child(f"tc_script_wrap_{i}", imgui.ImVec2(-1, 72), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(script) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.begin_child(f"tc_script_fixed_width_{i}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar) imgui.input_text_multiline(f"##tc_script_res_{i}", script, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end_child() # Result Display imgui.text_colored(C_LBL, "Output:") imgui.same_line() if imgui.button(f"[+]##output_{i}"): self.show_text_viewer = True self.text_viewer_title = f"Call Output #{i}" self.text_viewer_content = result if self.ui_word_wrap: imgui.begin_child(f"tc_res_wrap_{i}", imgui.ImVec2(-1, 72), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(result) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.begin_child(f"tc_res_fixed_width_{i}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar) imgui.input_text_multiline(f"##tc_res_val_{i}", result, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end_child() imgui.separator() imgui.end_child() def _render_mma_dashboard(self) -> None: # Task 5.3: Dense Summary Line track_name = self.active_track.description if self.active_track else "None" total_tickets = len(self.active_tickets) done_tickets = sum(1 for t in self.active_tickets if t.get('status') == 'complete') total_cost = 0.0 for stats in self.mma_tier_usage.values(): model = stats.get('model', 'unknown') in_t = stats.get('input', 0) out_t = stats.get('output', 0) total_cost += cost_tracker.estimate_cost(model, in_t, out_t) imgui.text("Track:") imgui.same_line() imgui.text_colored(C_VAL, track_name) imgui.same_line() imgui.text(" | Tickets:") imgui.same_line() imgui.text_colored(C_VAL, f"{done_tickets}/{total_tickets}") imgui.same_line() imgui.text(" | Cost:") imgui.same_line() imgui.text_colored(imgui.ImVec4(0, 1, 0, 1), f"${total_cost:,.4f}") imgui.same_line() imgui.text(" | Status:") imgui.same_line() status_col = imgui.ImVec4(1, 1, 1, 1) if self.mma_status == "idle": status_col = imgui.ImVec4(0.7, 0.7, 0.7, 1) elif self.mma_status == "running": status_col = imgui.ImVec4(1, 1, 0, 1) elif self.mma_status == "done": status_col = imgui.ImVec4(0, 1, 0, 1) elif self.mma_status == "error": status_col = imgui.ImVec4(1, 0, 0, 1) imgui.text_colored(status_col, self.mma_status.upper()) imgui.separator() # 0. Conductor Setup if imgui.collapsing_header("Conductor Setup"): if imgui.button("Run Setup Scan"): self._cb_run_conductor_setup() if self.ui_conductor_setup_summary: imgui.input_text_multiline("##setup_summary", self.ui_conductor_setup_summary, imgui.ImVec2(-1, 120), imgui.InputTextFlags_.read_only) imgui.separator() # 1. Track Browser imgui.text("Track Browser") if imgui.begin_table("mma_tracks_table", 4, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable): imgui.table_setup_column("Title") imgui.table_setup_column("Status") imgui.table_setup_column("Progress") imgui.table_setup_column("Actions") imgui.table_headers_row() for track in self.tracks: imgui.table_next_row() imgui.table_next_column() imgui.text(track.get("title", "Untitled")) imgui.table_next_column() status = track.get("status", "unknown").lower() if status == "new": imgui.text_colored(imgui.ImVec4(0.7, 0.7, 0.7, 1.0), "NEW") elif status == "active": imgui.text_colored(imgui.ImVec4(1.0, 1.0, 0.0, 1.0), "ACTIVE") elif status == "done": imgui.text_colored(imgui.ImVec4(0.0, 1.0, 0.0, 1.0), "DONE") elif status == "blocked": imgui.text_colored(imgui.ImVec4(1.0, 0.0, 0.0, 1.0), "BLOCKED") else: imgui.text(status) imgui.table_next_column() progress = track.get("progress", 0.0) if progress < 0.33: p_color = imgui.ImVec4(1.0, 0.0, 0.0, 1.0) elif progress < 0.66: p_color = imgui.ImVec4(1.0, 1.0, 0.0, 1.0) else: p_color = imgui.ImVec4(0.0, 1.0, 0.0, 1.0) imgui.push_style_color(imgui.Col_.plot_histogram, p_color) imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{int(progress*100)}%") imgui.pop_style_color() imgui.table_next_column() if imgui.button(f"Load##{track.get('id')}"): self._cb_load_track(str(track.get("id") or "")) imgui.end_table() # 1b. New Track Form imgui.text("Create New Track") changed_n, self.ui_new_track_name = imgui.input_text("Name##new_track", self.ui_new_track_name) changed_d, self.ui_new_track_desc = imgui.input_text_multiline("Description##new_track", self.ui_new_track_desc, imgui.ImVec2(-1, 60)) imgui.text("Type:") imgui.same_line() if imgui.begin_combo("##track_type", self.ui_new_track_type): for ttype in ["feature", "chore", "fix"]: if imgui.selectable(ttype, self.ui_new_track_type == ttype)[0]: self.ui_new_track_type = ttype imgui.end_combo() if imgui.button("Create Track"): self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type) self.ui_new_track_name = "" self.ui_new_track_desc = "" imgui.separator() # 2. Global Controls changed, self.mma_step_mode = imgui.checkbox("Step Mode (HITL)", self.mma_step_mode) if changed: # We could push an event here if the engine needs to know immediately pass imgui.same_line() imgui.text(f"Status: {self.mma_status.upper()}") if self.active_tier: imgui.same_line() imgui.text_colored(C_VAL, f"| Active: {self.active_tier}") # Approval pending indicator any_pending = ( self._pending_mma_spawn is not None or self._pending_mma_approval is not None or self._pending_ask_dialog ) if any_pending: alpha = abs(math.sin(time.time() * 5)) imgui.same_line() imgui.text_colored(imgui.ImVec4(1.0, 0.3, 0.3, alpha), " APPROVAL PENDING") imgui.same_line() if imgui.button("Go to Approval"): pass # scroll/focus handled by existing dialog rendering imgui.separator() # 2. Active Track Info if self.active_track: imgui.text(f"Track: {self.active_track.description}") # Progress bar tickets = self.active_tickets total = len(tickets) if total > 0: complete = sum(1 for t in tickets if t.get('status') == 'complete') progress = complete / total imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{complete}/{total} Tickets") else: imgui.text_disabled("No active MMA track.") # 3. Token Usage Table imgui.separator() imgui.text("Tier Usage (Tokens & Cost)") if imgui.begin_table("mma_usage", 5, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg): imgui.table_setup_column("Tier") imgui.table_setup_column("Model") imgui.table_setup_column("Input") imgui.table_setup_column("Output") imgui.table_setup_column("Est. Cost") imgui.table_headers_row() usage = self.mma_tier_usage total_cost = 0.0 for tier, stats in usage.items(): imgui.table_next_row() imgui.table_next_column() imgui.text(tier) imgui.table_next_column() model = stats.get('model', 'unknown') imgui.text(model) imgui.table_next_column() in_t = stats.get('input', 0) imgui.text(f"{in_t:,}") imgui.table_next_column() out_t = stats.get('output', 0) imgui.text(f"{out_t:,}") imgui.table_next_column() cost = cost_tracker.estimate_cost(model, in_t, out_t) total_cost += cost imgui.text(f"${cost:,.4f}") # Total Row imgui.table_next_row() imgui.table_set_bg_color(imgui.TableBgTarget_.row_bg0, imgui.get_color_u32(imgui.Col_.plot_lines_hovered)) imgui.table_next_column() imgui.text("TOTAL") imgui.table_next_column() imgui.text("") imgui.table_next_column() imgui.text("") imgui.table_next_column() imgui.text("") imgui.table_next_column() imgui.text(f"${total_cost:,.4f}") imgui.end_table() imgui.separator() # 3b. Tier Model Config if imgui.collapsing_header("Tier Model Config"): for tier in self.mma_tier_usage.keys(): imgui.text(f"{tier}:") imgui.same_line() current_model = self.mma_tier_usage[tier].get("model", "unknown") if imgui.begin_combo(f"##combo_{tier}", current_model): for model in self.available_models: if imgui.selectable(model, current_model == model)[0]: self.mma_tier_usage[tier]["model"] = model self.project.setdefault("mma", {}).setdefault("tier_models", {})[tier] = model imgui.end_combo() imgui.separator() # 4. Task DAG Visualizer imgui.text("Task DAG") if self.active_track: tickets_by_id = {str(t.get('id') or ''): t for t in self.active_tickets} all_ids = set(tickets_by_id.keys()) # Build children map children_map: dict[str, list[str]] = {} for t in self.active_tickets: for dep in t.get('depends_on', []): if dep not in children_map: children_map[dep] = [] children_map[dep].append(str(t.get('id') or '')) # Roots are those whose depends_on elements are NOT in all_ids roots = [] for t in self.active_tickets: deps = t.get('depends_on', []) has_local_dep = any(d in all_ids for d in deps) if not has_local_dep: roots.append(t) rendered: set[str] = set() for root in roots: self._render_ticket_dag_node(root, tickets_by_id, children_map, rendered) # 5. Add Ticket Form imgui.separator() if imgui.button("Add Ticket"): self._show_add_ticket_form = not self._show_add_ticket_form if self._show_add_ticket_form: # Default Ticket ID max_id = 0 for t in self.active_tickets: tid = t.get('id', '') if tid.startswith('T-'): try: max_id = max(max_id, int(tid[2:])) except: pass self.ui_new_ticket_id = f"T-{max_id + 1:03d}" self.ui_new_ticket_desc = "" self.ui_new_ticket_target = "" self.ui_new_ticket_deps = "" if self._show_add_ticket_form: imgui.begin_child("add_ticket_form", imgui.ImVec2(-1, 220), True) imgui.text_colored(C_VAL, "New Ticket Details") _, self.ui_new_ticket_id = imgui.input_text("ID##new_ticket", self.ui_new_ticket_id) _, self.ui_new_ticket_desc = imgui.input_text_multiline("Description##new_ticket", self.ui_new_ticket_desc, imgui.ImVec2(-1, 60)) _, self.ui_new_ticket_target = imgui.input_text("Target File##new_ticket", self.ui_new_ticket_target) _, self.ui_new_ticket_deps = imgui.input_text("Depends On (IDs, comma-separated)##new_ticket", self.ui_new_ticket_deps) if imgui.button("Create"): new_ticket = { "id": self.ui_new_ticket_id, "description": self.ui_new_ticket_desc, "status": "todo", "assigned_to": "tier3-worker", "target_file": self.ui_new_ticket_target, "depends_on": [d.strip() for d in self.ui_new_ticket_deps.split(",") if d.strip()] } self.active_tickets.append(new_ticket) self._show_add_ticket_form = False self._push_mma_state_update() imgui.same_line() if imgui.button("Cancel"): self._show_add_ticket_form = False imgui.end_child() else: imgui.text_disabled("No active MMA track.") def _render_tier_stream_panel(self, tier_key: str, stream_key: str | None) -> None: if stream_key is not None: content = self.mma_streams.get(stream_key, "") imgui.begin_child(f"##stream_content_{tier_key}", imgui.ImVec2(-1, -1)) imgui.text_wrapped(content) try: if len(content) != self._tier_stream_last_len.get(stream_key, -1): imgui.set_scroll_here_y(1.0) self._tier_stream_last_len[stream_key] = len(content) except (TypeError, AttributeError): pass imgui.end_child() else: tier3_keys = [k for k in self.mma_streams if "Tier 3" in k] if not tier3_keys: imgui.text_disabled("No worker output yet.") else: for key in tier3_keys: ticket_id = key.split(": ", 1)[-1] if ": " in key else key imgui.text(ticket_id) imgui.begin_child(f"##tier3_{ticket_id}_scroll", imgui.ImVec2(-1, 150), True) imgui.text_wrapped(self.mma_streams[key]) try: if len(self.mma_streams[key]) != self._tier_stream_last_len.get(key, -1): imgui.set_scroll_here_y(1.0) self._tier_stream_last_len[key] = len(self.mma_streams[key]) except (TypeError, AttributeError): pass imgui.end_child() def _render_ticket_dag_node(self, ticket: dict[str, Any], tickets_by_id: dict[str, Any], children_map: dict[str, list[str]], rendered: set[str]) -> None: tid = ticket.get('id', '??') is_duplicate = tid in rendered if not is_duplicate: rendered.add(tid) target = ticket.get('target_file', 'general') status = ticket.get('status', 'pending').upper() status_color = vec4(178, 178, 178) if status == 'RUNNING': status_color = vec4(255, 255, 0) elif status == 'COMPLETE': status_color = vec4(0, 255, 0) elif status in ['BLOCKED', 'ERROR']: status_color = vec4(255, 0, 0) elif status == 'PAUSED': status_color = vec4(255, 165, 0) p_min = imgui.get_cursor_screen_pos() p_max = imgui.ImVec2(p_min.x + 4, p_min.y + imgui.get_text_line_height()) imgui.get_window_draw_list().add_rect_filled(p_min, p_max, imgui.get_color_u32(status_color)) imgui.set_cursor_screen_pos(imgui.ImVec2(p_min.x + 8, p_min.y)) flags = imgui.TreeNodeFlags_.open_on_arrow | imgui.TreeNodeFlags_.open_on_double_click | imgui.TreeNodeFlags_.default_open children = children_map.get(tid, []) if not children or is_duplicate: flags |= imgui.TreeNodeFlags_.leaf node_open = imgui.tree_node_ex(f"##{tid}", flags) if imgui.is_item_hovered(): imgui.begin_tooltip() imgui.text_colored(C_KEY, f"ID: {tid}") imgui.text_colored(C_LBL, f"Target: {target}") imgui.text_colored(C_LBL, "Description:") imgui.same_line() imgui.text_wrapped(ticket.get('description', 'N/A')) deps = ticket.get('depends_on', []) if deps: imgui.text_colored(C_LBL, f"Depends on: {', '.join(deps)}") stream_key = f"Tier 3: {tid}" if stream_key in self.mma_streams: imgui.separator() imgui.text_colored(C_KEY, "Worker Stream:") imgui.text_wrapped(self.mma_streams[stream_key]) imgui.end_tooltip() imgui.same_line() imgui.text_colored(C_KEY, tid) imgui.same_line(150) imgui.text_disabled(str(target)) imgui.same_line(400) imgui.text_colored(status_color, status) imgui.same_line(500) if imgui.button(f"Retry##{tid}"): self._cb_ticket_retry(tid) imgui.same_line() if imgui.button(f"Skip##{tid}"): self._cb_ticket_skip(tid) if status in ['TODO', 'BLOCKED']: imgui.same_line() if imgui.button(f"Delete##{tid}"): self.active_tickets = [t for t in self.active_tickets if t.get('id') != tid] for t in self.active_tickets: deps = t.get('depends_on', []) if tid in deps: t['depends_on'] = [d for d in deps if d != tid] self._push_mma_state_update() if is_duplicate: imgui.same_line() imgui.text_disabled("(shown above)") if node_open and not is_duplicate: for child_id in children: child = tickets_by_id.get(child_id) if child: self._render_ticket_dag_node(child, tickets_by_id, children_map, rendered) imgui.tree_pop() def _render_comms_history_panel(self) -> None: imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}") imgui.same_line() if imgui.button("Clear##comms"): ai_client.clear_comms_log() self._comms_log.clear() imgui.same_line() if imgui.button("Load Log"): self.cb_load_prior_log() if self.is_viewing_prior_session: imgui.same_line() if imgui.button("Exit Prior Session"): self.is_viewing_prior_session = False self.prior_session_entries.clear() self.ai_status = "idle" imgui.separator() imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION") imgui.separator() imgui.text_colored(C_OUT, "OUT") imgui.same_line() imgui.text_colored(C_REQ, "request") imgui.same_line() imgui.text_colored(C_TC, "tool_call") imgui.same_line() imgui.text(" ") imgui.same_line() imgui.text_colored(C_IN, "IN") imgui.same_line() imgui.text_colored(C_RES, "response") imgui.same_line() imgui.text_colored(C_TR, "tool_result") imgui.separator() # Use tinted background for prior session if self.is_viewing_prior_session: imgui.push_style_color(imgui.Col_.child_bg, vec4(40, 30, 20)) imgui.begin_child("comms_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar) log_to_render = self.prior_session_entries if self.is_viewing_prior_session else list(self._comms_log) if self.ui_focus_agent and not self.is_viewing_prior_session: log_to_render = [e for e in log_to_render if e.get("source_tier") == self.ui_focus_agent] for idx_minus_one, entry in enumerate(log_to_render): idx = idx_minus_one + 1 local_ts = entry.get("local_ts", 0) # Blink effect blink_alpha = 0.0 if local_ts > 0 and not self.is_viewing_prior_session: elapsed = time.time() - local_ts if elapsed < 3.0: blink_alpha = (1.0 - (elapsed / 3.0)) * 0.3 * (math.sin(elapsed * 10) * 0.5 + 0.5) imgui.push_id(f"comms_{idx}") if blink_alpha > 0: # Draw a background highlight for the entry imgui.get_window_draw_list() imgui.get_cursor_screen_pos() # Estimate height or just use a fixed height for the background # It's better to wrap the entry in a group or just use separators # For now, let's just use the style color push if we are sure we pop it imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, blink_alpha)) # We still need a child or a group to apply the background to imgui.begin_group() d = entry.get("direction", "IN") k = entry.get("kind", "response") imgui.text_colored(vec4(160, 160, 160), f"#{idx}") imgui.same_line() imgui.text_colored(vec4(160, 160, 160), entry.get("ts", "00:00:00")) imgui.same_line() imgui.text_colored(DIR_COLORS.get(d, C_VAL), d) imgui.same_line() imgui.text_colored(KIND_COLORS.get(k, C_VAL), k) imgui.same_line() imgui.text_colored(C_LBL, f"{entry.get('provider', '?')}/{entry.get('model', '?')}") imgui.same_line() tier_label = entry.get("source_tier") or "main" imgui.text_colored(C_SUB, f"[{tier_label}]") payload = entry.get("payload", {}) if k == "request": self._render_heavy_text("message", payload.get("message", "")) elif k == "response": imgui.text_colored(C_LBL, "round:") imgui.same_line() imgui.text_colored(C_VAL, str(payload.get("round", ""))) imgui.text_colored(C_LBL, "stop_reason:") imgui.same_line() imgui.text_colored(vec4(255, 200, 120), str(payload.get("stop_reason", ""))) text = payload.get("text", "") if text: self._render_heavy_text("text", text) imgui.text_colored(C_LBL, "tool_calls:") tcs = payload.get("tool_calls", []) if not tcs: imgui.text_colored(C_VAL, " (none)") for tc_i, tc in enumerate(tcs): imgui.text_colored(C_KEY, f" call[{tc_i}] {tc.get('name', '?')}") if tc.get("id"): imgui.text_colored(C_LBL, " id:") imgui.same_line() imgui.text_colored(C_VAL, str(tc["id"])) if "args" in tc or "input" in tc: self._render_heavy_text(f"call_{tc_i}_args", str(tc.get("args") or tc.get("input"))) elif k == "tool_call": imgui.text_colored(C_KEY, payload.get("name", "?")) if payload.get("id"): imgui.text_colored(C_LBL, " id:") imgui.same_line() imgui.text_colored(C_VAL, str(payload["id"])) if "script" in payload: self._render_heavy_text("script", payload["script"]) if "args" in payload: self._render_heavy_text("args", str(payload["args"])) elif k == "tool_result": imgui.text_colored(C_KEY, payload.get("name", "?")) if payload.get("id"): imgui.text_colored(C_LBL, " id:") imgui.same_line() imgui.text_colored(C_VAL, str(payload["id"])) if "output" in payload: self._render_heavy_text("output", payload["output"]) if "results" in payload: for r_i, r in enumerate(payload["results"]): imgui.text_colored(C_LBL, f" Result[{r_i}]:") self._render_heavy_text(f"res_{r_i}", str(r)) if "usage" in payload: u = payload["usage"] u_str = f"In: {u.get('input_tokens', 0)} Out: {u.get('output_tokens', 0)}" if u.get("cache_read_input_tokens"): u_str += f" (Cache: {u['cache_read_input_tokens']})" imgui.text_colored(C_SUB, f" Usage: {u_str}") imgui.separator() if blink_alpha > 0: imgui.end_group() imgui.pop_style_color() imgui.pop_id() if self._scroll_comms_to_bottom: imgui.set_scroll_here_y(1.0) self._scroll_comms_to_bottom = False imgui.end_child() if self.is_viewing_prior_session: imgui.pop_style_color() def _render_system_prompts_panel(self) -> None: imgui.text("Global System Prompt (all projects)") ch, self.ui_global_system_prompt = imgui.input_text_multiline("##gsp", self.ui_global_system_prompt, imgui.ImVec2(-1, 100)) imgui.separator() imgui.text("Project System Prompt") ch, self.ui_project_system_prompt = imgui.input_text_multiline("##psp", self.ui_project_system_prompt, imgui.ImVec2(-1, 100)) def _render_theme_panel(self) -> None: exp, opened = imgui.begin("Theme", self.show_windows["Theme"]) self.show_windows["Theme"] = bool(opened) if exp: imgui.text("Palette") cp = theme.get_current_palette() if imgui.begin_combo("##pal", cp): for p in theme.get_palette_names(): if imgui.selectable(p, p == cp)[0]: theme.apply(p) imgui.end_combo() imgui.separator() imgui.text("Font") imgui.push_item_width(-150) ch, path = imgui.input_text("##fontp", theme.get_current_font_path()) imgui.pop_item_width() if ch: theme._current_font_path = path imgui.same_line() if imgui.button("Browse##font"): r = hide_tk_root() p = filedialog.askopenfilename(filetypes=[("Fonts", "*.ttf *.otf"), ("All", "*.*")]) r.destroy() if p: theme._current_font_path = p imgui.text("Size (px)") imgui.same_line() imgui.push_item_width(100) ch, size = imgui.input_float("##fonts", theme.get_current_font_size(), 1.0, 1.0, "%.0f") if ch: theme._current_font_size = size imgui.pop_item_width() imgui.same_line() if imgui.button("Apply Font (Requires Restart)"): self._flush_to_config() save_config(self.config) self.ai_status = "Font settings saved. Restart required." imgui.separator() imgui.text("UI Scale (DPI)") ch, scale = imgui.slider_float("##scale", theme.get_current_scale(), 0.5, 3.0, "%.2f") if ch: theme.set_scale(scale) imgui.end() def _load_fonts(self) -> None: font_path, font_size = theme.get_font_loading_params() if font_path and Path(font_path).exists(): hello_imgui.load_font(font_path, font_size) def _post_init(self) -> None: theme.apply_current() def run(self) -> None: """Initializes the ImGui runner and starts the main application loop.""" if "--headless" in sys.argv: print("Headless mode active") self._fetch_models(self.current_provider) import uvicorn headless_cfg = self.config.get("headless", {}) port = headless_cfg.get("port", 8000) api = self.create_api() uvicorn.run(api, host="0.0.0.0", port=port) else: theme.load_from_config(self.config) self.runner_params = hello_imgui.RunnerParams() self.runner_params.app_window_params.window_title = "manual slop" self.runner_params.app_window_params.window_geometry.size = (1680, 1200) self.runner_params.imgui_window_params.enable_viewports = False self.runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space self.runner_params.fps_idling.enable_idling = False self.runner_params.imgui_window_params.show_menu_bar = True self.runner_params.ini_folder_type = hello_imgui.IniFolderType.current_folder self.runner_params.ini_filename = "manualslop_layout.ini" self.runner_params.callbacks.show_gui = self._gui_func self.runner_params.callbacks.show_menus = self._show_menus self.runner_params.callbacks.load_additional_fonts = self._load_fonts self.runner_params.callbacks.post_init = self._post_init self._fetch_models(self.current_provider) immapp.run(self.runner_params) # On exit self.shutdown() session_logger.close_session() def main() -> None: app = App() app.run() if __name__ == "__main__": main()