# gui_2.py from __future__ import annotations import tomli_w import threading import asyncio import time import math import json import sys import os import uuid import requests from pathlib import Path from tkinter import filedialog, Tk from typing import Optional, Callable, Any, Dict, List, Tuple, Union import aggregate import ai_client from ai_client import ProviderError import shell_runner import session_logger import project_manager import theme_2 as theme import tomllib import events import numpy as np import api_hooks import mcp_client import orchestrator_pm from performance_monitor import PerformanceMonitor from log_registry import LogRegistry from log_pruner import LogPruner import conductor_tech_lead import multi_agent_conductor from models import Track, Ticket from file_cache import ASTParser from fastapi import FastAPI, Depends, HTTPException, Security from fastapi.security.api_key import APIKeyHeader from pydantic import BaseModel from imgui_bundle import imgui, hello_imgui, immapp CONFIG_PATH: Path = Path("config.toml") PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek"] COMMS_CLAMP_CHARS: int = 300 def load_config() -> dict[str, Any]: with open(CONFIG_PATH, "rb") as f: return tomllib.load(f) def save_config(config: dict[str, Any]) -> None: with open(CONFIG_PATH, "wb") as f: tomli_w.dump(config, f) def hide_tk_root() -> Tk: root = Tk() root.withdraw() root.wm_attributes("-topmost", True) return root # Color Helpers def vec4(r: float, g: float, b: float, a: float = 1.0) -> imgui.ImVec4: return imgui.ImVec4(r/255, g/255, b/255, a) C_OUT: tuple[float, ...] = vec4(100, 200, 255) C_IN: tuple[float, ...] = vec4(140, 255, 160) C_REQ: tuple[float, ...] = vec4(255, 220, 100) C_RES: tuple[float, ...] = vec4(180, 255, 180) C_TC: tuple[float, ...] = vec4(255, 180, 80) C_TR: tuple[float, ...] = vec4(180, 220, 255) C_TRS: tuple[float, ...] = vec4(200, 180, 255) C_LBL: tuple[float, ...] = vec4(180, 180, 180) C_VAL: tuple[float, ...] = vec4(220, 220, 220) C_KEY: tuple[float, ...] = vec4(140, 200, 255) C_NUM: tuple[float, ...] = vec4(180, 255, 180) C_SUB: tuple[float, ...] = vec4(220, 200, 120) DIR_COLORS: dict[str, tuple[float, ...]] = {"OUT": C_OUT, "IN": C_IN} KIND_COLORS: dict[str, tuple[float, ...]] = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS} HEAVY_KEYS: set[str] = {"message", "text", "script", "output", "content"} DISC_ROLES: list[str] = ["User", "AI", "Vendor API", "System"] AGENT_TOOL_NAMES: list[str] = ["run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"] def truncate_entries(entries: list[dict[str, Any]], max_pairs: int) -> list[dict[str, Any]]: if max_pairs <= 0: return [] target_count = max_pairs * 2 if len(entries) <= target_count: return entries return entries[-target_count:] def _parse_history_entries(history: list[str], roles: list[str] | None = None) -> list[dict[str, Any]]: known = roles if roles is not None else DISC_ROLES entries = [] for raw in history: entries.append(project_manager.str_to_entry(raw, known)) return entries class ConfirmDialog: def __init__(self, script: str, base_dir: str) -> None: self._uid = str(uuid.uuid4()) self._script = str(script) if script is not None else "" self._base_dir = str(base_dir) if base_dir is not None else "" self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: with self._condition: while not self._done: self._condition.wait(timeout=0.1) return self._approved, self._script class MMAApprovalDialog: def __init__(self, ticket_id: str, payload: str) -> None: self._ticket_id = ticket_id self._payload = payload self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: with self._condition: while not self._done: self._condition.wait(timeout=0.1) return self._approved, self._payload class MMASpawnApprovalDialog: def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None: self._ticket_id = ticket_id self._role = role self._prompt = prompt self._context_md = context_md self._condition = threading.Condition() self._done = False self._approved = False self._abort = False def wait(self) -> dict[str, Any]: with self._condition: while not self._done: self._condition.wait(timeout=0.1) return { 'approved': self._approved, 'abort': self._abort, 'prompt': self._prompt, 'context_md': self._context_md } class App: """The main ImGui interface orchestrator for Manual Slop.""" def __init__(self) -> None: self.config = load_config() self.event_queue = events.AsyncEventQueue() self._loop = asyncio.new_event_loop() self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) self._loop_thread.start() ai_cfg = self.config.get("ai", {}) self._current_provider: str = ai_cfg.get("provider", "gemini") self._current_model: str = ai_cfg.get("model", "gemini-2.5-flash-lite") self.available_models: list[str] = [] self.temperature: float = ai_cfg.get("temperature", 0.0) self.max_tokens: int = ai_cfg.get("max_tokens", 8192) self.history_trunc_limit: int = ai_cfg.get("history_trunc_limit", 8000) projects_cfg = self.config.get("projects", {}) self.project_paths: list[str] = list(projects_cfg.get("paths", [])) self.active_project_path: str = projects_cfg.get("active", "") self.project: dict[str, Any] = {} self.active_discussion: str = "main" self._load_active_project() self.files: list[str] = list(self.project.get("files", {}).get("paths", [])) self.screenshots: list[str] = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles: list[str] = list(disc_sec.get("roles", list(DISC_ROLES))) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) self.disc_entries: list[dict[str, Any]] = _parse_history_entries(disc_data.get("history", []), self.disc_roles) self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".") proj_meta = self.project.get("project", {}) self.ui_project_git_dir = proj_meta.get("git_dir", "") self.ui_project_main_context = proj_meta.get("main_context", "") self.ui_project_system_prompt = proj_meta.get("system_prompt", "") self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini") self.ui_word_wrap = proj_meta.get("word_wrap", True) self.ui_summary_only = proj_meta.get("summary_only", False) self.ui_auto_add_history = disc_sec.get("auto_add", False) self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "") self.ui_ai_input = "" self.ui_disc_new_name_input = "" self.ui_disc_new_role_input = "" self.ui_epic_input = "" self.proposed_tracks: list[dict[str, Any]] = [] self._show_track_proposal_modal = False self.ui_last_script_text = "" self.ui_last_script_output = "" self.ai_status = "idle" self.ai_response = "" self.last_md = "" self.last_md_path: Path | None = None self.last_file_items: list[Any] = [] self.send_thread: threading.Thread | None = None self._send_thread_lock = threading.Lock() self.models_thread: threading.Thread | None = None _default_windows = { "Context Hub": True, "Files & Media": True, "AI Settings": True, "MMA Dashboard": True, "Discussion Hub": True, "Operations Hub": True, "Theme": True, "Log Management": False, "Diagnostics": False, } saved = self.config.get("gui", {}).get("show_windows", {}) self.show_windows = {k: saved.get(k, v) for k, v in _default_windows.items()} self.show_script_output = False self.show_text_viewer = False self.text_viewer_title = "" self.text_viewer_content = "" self._pending_dialog: ConfirmDialog | None = None self._pending_dialog_open = False self._pending_dialog_lock = threading.Lock() self._pending_actions: dict[str, ConfirmDialog] = {} self._pending_ask_dialog = False self._ask_dialog_open = False self._ask_request_id = None self._ask_tool_data = None self.mma_step_mode = False self.active_track: Track | None = None self.active_tickets: list[dict[str, Any]] = [] self.active_tier: str | None = None self.mma_status = "idle" self._pending_mma_approval: dict[str, Any] | None = None self._mma_approval_open = False self._mma_approval_edit_mode = False self._mma_approval_payload = "" self._pending_mma_spawn: dict[str, Any] | None = None self._mma_spawn_open = False self._mma_spawn_edit_mode = False self._mma_spawn_prompt = '' self._mma_spawn_context = '' self.ui_epic_input = "" self.proposed_tracks: list[dict[str, Any]] = [] self._show_track_proposal_modal = False self.mma_tier_usage = { "Tier 1": {"input": 0, "output": 0}, "Tier 2": {"input": 0, "output": 0}, "Tier 3": {"input": 0, "output": 0}, "Tier 4": {"input": 0, "output": 0}, } self._tool_log: list[tuple[str, str, float]] = [] self._comms_log: list[dict[str, Any]] = [] self._pending_comms: list[dict[str, Any]] = [] self._pending_comms_lock = threading.Lock() self._pending_tool_calls: list[tuple[str, str, float]] = [] self._pending_tool_calls_lock = threading.Lock() self._pending_history_adds: list[dict[str, Any]] = [] self._pending_history_adds_lock = threading.Lock() self._trigger_blink = False self._is_blinking = False self._blink_start_time = 0.0 self._trigger_script_blink = False self._is_script_blinking = False self._script_blink_start_time = 0.0 self._scroll_disc_to_bottom = False self._scroll_comms_to_bottom = False self._scroll_tool_calls_to_bottom = False self._pending_gui_tasks: list[dict[str, Any]] = [] self._pending_gui_tasks_lock = threading.Lock() self.session_usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0} self._token_budget_pct = 0.0 self._token_budget_current = 0 self._token_budget_limit = 0 self._gemini_cache_text = "" self.ui_disc_truncate_pairs: int = 2 self.ui_auto_scroll_comms = True self.ui_auto_scroll_tool_calls = True agent_tools_cfg = self.project.get("agent", {}).get("tools", {}) self.ui_agent_tools: dict[str, bool] = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES} self.tracks: list[dict[str, Any]] = [] self.mma_streams: dict[str, str] = {} self.is_viewing_prior_session = False self.prior_session_entries: list[dict[str, Any]] = [] self.test_hooks_enabled = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1") self.perf_monitor = PerformanceMonitor() self.perf_history = {"frame_time": [0.0]*100, "fps": [0.0]*100, "cpu": [0.0]*100, "input_lag": [0.0]*100} self._perf_last_update = 0.0 self._autosave_interval = 60.0 self._last_autosave = time.time() label = self.project.get("project", {}).get("name", "") session_logger.open_session(label=label) self._prune_old_logs() self._init_ai_and_hooks() def _prune_old_logs(self) -> None: """Asynchronously prunes old insignificant logs on startup.""" def run_prune() -> None: try: registry = LogRegistry("logs/log_registry.toml") pruner = LogPruner(registry, "logs") pruner.prune() except Exception as e: print(f"Error during log pruning: {e}") thread = threading.Thread(target=run_prune, daemon=True) thread.start() @property def current_provider(self) -> str: return self._current_provider @current_provider.setter def current_provider(self, value: str) -> None: if value != self._current_provider: self._current_provider = value ai_client.reset_session() ai_client.set_provider(value, self.current_model) if value == "gemini_cli": if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) else: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path if hasattr(self, 'hook_server'): self.hook_server.start() self.available_models = [] self._fetch_models(value) @property def current_model(self) -> str: return self._current_model @current_model.setter def current_model(self, value: str) -> None: if value != self._current_model: self._current_model = value ai_client.reset_session() ai_client.set_provider(self.current_provider, value) def _init_ai_and_hooks(self) -> None: ai_client.set_provider(self.current_provider, self.current_model) if self.current_provider == "gemini_cli": if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) else: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path ai_client.confirm_and_run_callback = self._confirm_and_run ai_client.comms_log_callback = self._on_comms_entry ai_client.tool_log_callback = self._on_tool_log mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics self.perf_monitor.alert_callback = self._on_performance_alert ai_client.events.on("request_start", self._on_api_event) ai_client.events.on("response_received", self._on_api_event) ai_client.events.on("tool_execution", self._on_api_event) self._settable_fields: dict[str, str] = { 'ai_input': 'ui_ai_input', 'project_git_dir': 'ui_project_git_dir', 'auto_add_history': 'ui_auto_add_history', 'disc_new_name_input': 'ui_disc_new_name_input', 'project_main_context': 'ui_project_main_context', 'gcli_path': 'ui_gemini_cli_path', 'output_dir': 'ui_output_dir', 'files_base_dir': 'ui_files_base_dir', 'ai_status': 'ai_status', 'ai_response': 'ai_response', 'active_discussion': 'active_discussion', 'current_provider': 'current_provider', 'current_model': 'current_model', 'token_budget_pct': '_token_budget_pct', 'token_budget_current': '_token_budget_current', 'token_budget_label': '_token_budget_label', 'show_confirm_modal': 'show_confirm_modal', 'mma_epic_input': 'ui_epic_input', 'mma_status': 'mma_status', 'mma_active_tier': 'active_tier' } self._clickable_actions: dict[str, Callable[..., Any]] = { 'btn_reset': self._handle_reset_session, 'btn_gen_send': self._handle_generate_send, 'btn_md_only': self._handle_md_only, 'btn_approve_script': self._handle_approve_script, 'btn_reject_script': self._handle_reject_script, 'btn_project_save': self._cb_project_save, 'btn_disc_create': self._cb_disc_create, 'btn_mma_plan_epic': self._cb_plan_epic, 'btn_mma_accept_tracks': self._cb_accept_tracks, 'btn_mma_start_track': self._cb_start_track, } self._predefined_callbacks: dict[str, Callable[..., Any]] = { '_test_callback_func_write_to_file': self._test_callback_func_write_to_file } self._discussion_names_cache: list[str] = [] self._discussion_names_dirty: bool = True def create_api(self) -> FastAPI: """Creates and configures the FastAPI application for headless mode.""" api = FastAPI(title="Manual Slop Headless API") class GenerateRequest(BaseModel): prompt: str auto_add_history: bool = True temperature: float | None = None max_tokens: int | None = None class ConfirmRequest(BaseModel): approved: bool API_KEY_NAME = "X-API-KEY" api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) async def get_api_key(header_key: str = Depends(api_key_header)) -> str: """Validates the API key from the request header against configuration.""" headless_cfg = self.config.get("headless", {}) config_key = headless_cfg.get("api_key", "").strip() env_key = os.environ.get("SLOP_API_KEY", "").strip() target_key = env_key or config_key if not target_key: raise HTTPException(status_code=403, detail="API Key not configured on server") if header_key == target_key: return header_key raise HTTPException(status_code=403, detail="Could not validate API Key") @api.get("/health") def health() -> dict[str, str]: """Basic health check endpoint.""" return {"status": "ok"} @api.get("/status", dependencies=[Depends(get_api_key)]) def status() -> dict[str, Any]: """Returns the current status of the AI provider and active project.""" return { "provider": self.current_provider, "model": self.current_model, "active_project": self.active_project_path, "ai_status": self.ai_status, "session_usage": self.session_usage } @api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)]) def pending_actions() -> list[dict[str, Any]]: """Lists all PowerShell scripts awaiting manual confirmation.""" actions = [] with self._pending_dialog_lock: for uid, dialog in self._pending_actions.items(): actions.append({ "action_id": uid, "script": dialog._script, "base_dir": dialog._base_dir }) if self._pending_dialog: actions.append({ "action_id": self._pending_dialog._uid, "script": self._pending_dialog._script, "base_dir": self._pending_dialog._base_dir }) return actions @api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)]) def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, Any]: """Approves or denies a pending PowerShell script execution.""" success = self.resolve_pending_action(action_id, req.approved) if not success: raise HTTPException(status_code=404, detail=f"Action ID {action_id} not found") return {"status": "success", "action_id": action_id, "approved": req.approved} @api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)]) def list_sessions() -> list[str]: """Lists all available session log files.""" log_dir = Path("logs") if not log_dir.exists(): return [] return sorted([f.name for f in log_dir.glob("*.log")], reverse=True) @api.get("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)]) def get_session(filename: str) -> dict[str, str]: """Retrieves the content of a specific session log file.""" if ".." in filename or "/" in filename or "\\" in filename: raise HTTPException(status_code=400, detail="Invalid filename") log_path = Path("logs") / filename if not log_path.exists() or not log_path.is_file(): raise HTTPException(status_code=404, detail="Session log not found") try: content = log_path.read_text(encoding="utf-8") return {"filename": filename, "content": content} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @api.delete("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)]) def delete_session(filename: str) -> dict[str, str]: """Deletes a specific session log file.""" if ".." in filename or "/" in filename or "\\" in filename: raise HTTPException(status_code=400, detail="Invalid filename") log_path = Path("logs") / filename if not log_path.exists() or not log_path.is_file(): raise HTTPException(status_code=404, detail="Session log not found") try: log_path.unlink() return {"status": "success", "message": f"Deleted {filename}"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @api.get("/api/v1/context", dependencies=[Depends(get_api_key)]) def get_context() -> dict[str, Any]: """Returns the current file and screenshot context configuration.""" return { "files": self.files, "screenshots": self.screenshots, "files_base_dir": self.ui_files_base_dir, "screenshots_base_dir": self.ui_shots_base_dir } @api.post("/api/v1/generate", dependencies=[Depends(get_api_key)]) def generate(req: GenerateRequest) -> dict[str, Any]: """Triggers an AI generation request using the current project context.""" if not req.prompt.strip(): raise HTTPException(status_code=400, detail="Prompt cannot be empty") with self._send_thread_lock: start_time = time.time() try: md, path, file_items, stable_md, disc_text = self._do_generate() self.last_md = md self.last_md_path = path self.last_file_items = file_items except Exception as e: raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") user_msg = req.prompt base_dir = self.ui_files_base_dir csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) temp = req.temperature if req.temperature is not None else self.temperature tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens ai_client.set_model_params(temp, tokens, self.history_trunc_limit) ai_client.set_agent_tools(self.ui_agent_tools) if req.auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "User", "content": user_msg, "collapsed": False, "ts": project_manager.now_ts() }) try: resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text) if req.auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "AI", "content": resp, "collapsed": False, "ts": project_manager.now_ts() }) self._recalculate_session_usage() duration = time.time() - start_time return { "text": resp, "metadata": { "provider": self.current_provider, "model": self.current_model, "duration_sec": round(duration, 3), "timestamp": project_manager.now_ts() }, "usage": self.session_usage } except ProviderError as e: raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}") except Exception as e: raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}") @api.post("/api/v1/stream", dependencies=[Depends(get_api_key)]) async def stream(req: GenerateRequest) -> Any: """Placeholder for streaming AI generation responses (Not yet implemented).""" raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.") return api # ---------------------------------------------------------------- project loading def _cb_new_project_automated(self, user_data: Any) -> None: if user_data: name = Path(user_data).stem proj = project_manager.default_project(name) project_manager.save_project(proj, user_data) if user_data not in self.project_paths: self.project_paths.append(user_data) self._switch_project(user_data) def _cb_project_save(self) -> None: self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self.ai_status = "config saved" def _cb_disc_create(self) -> None: nm = self.ui_disc_new_name_input.strip() if nm: self._create_discussion(nm) self.ui_disc_new_name_input = "" def _load_active_project(self) -> None: if self.active_project_path and Path(self.active_project_path).exists(): try: self.project = project_manager.load_project(self.active_project_path) return except Exception as e: print(f"Failed to load project {self.active_project_path}: {e}") for pp in self.project_paths: if Path(pp).exists(): try: self.project = project_manager.load_project(pp) self.active_project_path = pp return except Exception: continue self.project = project_manager.migrate_from_legacy_config(self.config) name = self.project.get("project", {}).get("name", "project") fallback_path = f"{name}.toml" project_manager.save_project(self.project, fallback_path) self.active_project_path = fallback_path if fallback_path not in self.project_paths: self.project_paths.append(fallback_path) def _switch_project(self, path: str) -> None: if not Path(path).exists(): self.ai_status = f"project file not found: {path}" return self._flush_to_project() self._save_active_project() try: self.project = project_manager.load_project(path) self.active_project_path = path except Exception as e: self.ai_status = f"failed to load project: {e}" return self._refresh_from_project() self._discussion_names_dirty = True ai_client.reset_session() self.ai_status = f"switched to: {Path(path).stem}" def _refresh_from_project(self) -> None: self.files = list(self.project.get("files", {}).get("paths", [])) self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES))) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles) proj = self.project self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".") self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "") self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "") self.ui_project_main_context = proj.get("project", {}).get("main_context", "") self.ui_gemini_cli_path = proj.get("gemini_cli", {}).get("binary_path", "gemini") self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False) self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True) self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True) self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True) self.ui_summary_only = proj.get("project", {}).get("summary_only", False) agent_tools_cfg = proj.get("agent", {}).get("tools", {}) self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES} # MMA Tracks self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir) # Restore MMA state mma_sec = proj.get("mma", {}) self.ui_epic_input = mma_sec.get("epic", "") at_data = mma_sec.get("active_track") if at_data: try: tickets = [] for t_data in at_data.get("tickets", []): tickets.append(Ticket(**t_data)) self.active_track = Track( id=at_data.get("id"), description=at_data.get("description"), tickets=tickets ) self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table except Exception as e: print(f"Failed to deserialize active track: {e}") self.active_track = None else: self.active_track = None self.active_tickets = [] # Load track-scoped history if track is active if self.active_track: track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir) if track_history: self.disc_entries = _parse_history_entries(track_history, self.disc_roles) def _cb_load_track(self, track_id: str) -> None: state = project_manager.load_track_state(track_id, self.ui_files_base_dir) if state: try: # Convert list[Ticket] or list[dict] to list[Ticket] for Track object tickets = [] for t in state.tasks: if isinstance(t, dict): tickets.append(Ticket(**t)) else: tickets.append(t) self.active_track = Track( id=state.metadata.id, description=state.metadata.name, tickets=tickets ) # Keep dicts for UI table (or convert Ticket objects back to dicts if needed) from dataclasses import asdict self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets] # Load track-scoped history history = project_manager.load_track_history(track_id, self.ui_files_base_dir) if history: self.disc_entries = _parse_history_entries(history, self.disc_roles) else: self.disc_entries = [] self._recalculate_session_usage() self.ai_status = f"Loaded track: {state.metadata.name}" except Exception as e: self.ai_status = f"Load track error: {e}" print(f"Error loading track {track_id}: {e}") def _save_active_project(self) -> None: if self.active_project_path: try: project_manager.save_project(self.project, self.active_project_path) except Exception as e: self.ai_status = f"save error: {e}" # ---------------------------------------------------------------- discussion management def _get_discussion_names(self) -> list[str]: if self._discussion_names_dirty: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) self._discussion_names_cache = sorted(discussions.keys()) self._discussion_names_dirty = False return self._discussion_names_cache def _switch_discussion(self, name: str) -> None: self._flush_disc_entries_to_project() disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if name not in discussions: self.ai_status = f"discussion not found: {name}" return self.active_discussion = name disc_sec["active"] = name self._discussion_names_dirty = True disc_data = discussions[name] self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles) self.ai_status = f"discussion: {name}" def _flush_disc_entries_to_project(self) -> None: history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries] if self.active_track: project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir) return disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion()) disc_data["history"] = history_strings disc_data["last_updated"] = project_manager.now_ts() def _create_discussion(self, name: str) -> None: disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) if name in discussions: self.ai_status = f"discussion '{name}' already exists" return discussions[name] = project_manager.default_discussion() self._discussion_names_dirty = True self._switch_discussion(name) def _rename_discussion(self, old_name: str, new_name: str) -> None: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if old_name not in discussions: return if new_name in discussions: self.ai_status = f"discussion '{new_name}' already exists" return discussions[new_name] = discussions.pop(old_name) self._discussion_names_dirty = True if self.active_discussion == old_name: self.active_discussion = new_name disc_sec["active"] = new_name def _delete_discussion(self, name: str) -> None: disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if len(discussions) <= 1: self.ai_status = "cannot delete the last discussion" return if name not in discussions: return del discussions[name] self._discussion_names_dirty = True if self.active_discussion == name: remaining = sorted(discussions.keys()) self._switch_discussion(remaining[0]) # ---------------------------------------------------------------- logic def _on_comms_entry(self, entry: dict) -> None: session_logger.log_comms(entry) entry["local_ts"] = time.time() # If this is a history_add kind, route it to history queue instead if entry.get("kind") == "history_add": payload = entry.get("payload", {}) with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": payload.get("role", "AI"), "content": payload.get("content", ""), "collapsed": payload.get("collapsed", False), "ts": entry.get("ts", project_manager.now_ts()) }) return with self._pending_comms_lock: self._pending_comms.append(entry) def _on_tool_log(self, script: str, result: str) -> None: session_logger.log_tool_call(script, result, None) with self._pending_tool_calls_lock: self._pending_tool_calls.append((script, result, time.time())) def _on_api_event(self, *args, **kwargs) -> None: payload = kwargs.get("payload", {}) with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload}) def _on_performance_alert(self, message: str) -> None: """Called by PerformanceMonitor when a threshold is exceeded.""" alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load." # Inject into history as a 'System' message with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "System", "content": alert_text, "ts": project_manager.now_ts() }) def _process_pending_gui_tasks(self) -> None: if not self._pending_gui_tasks: return with self._pending_gui_tasks_lock: tasks = self._pending_gui_tasks[:] self._pending_gui_tasks.clear() for task in tasks: try: action = task.get("action") if action == "refresh_api_metrics": self._refresh_api_metrics(task.get("payload", {})) elif action == "handle_ai_response": payload = task.get("payload", {}) text = payload.get("text", "") stream_id = payload.get("stream_id") if stream_id: self.mma_streams[stream_id] = text if stream_id == "Tier 1": if "status" in payload: self.ai_status = payload["status"] else: self.ai_response = text self.ai_status = payload.get("status", "done") self._trigger_blink = True if self.ui_auto_add_history and not stream_id: role = payload.get("role", "AI") with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": role, "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts() }) elif action == "show_track_proposal": self.proposed_tracks = task.get("payload", []) self._show_track_proposal_modal = True elif action == "mma_state_update": payload = task.get("payload", {}) self.mma_status = payload.get("status", "idle") self.active_tier = payload.get("active_tier") self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage) self.active_track = payload.get("track") self.active_tickets = payload.get("tickets", []) elif action == "set_value": item = task.get("item") value = task.get("value") if item in self._settable_fields: attr_name = self._settable_fields[item] setattr(self, attr_name, value) if item == "gcli_path": if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=value) else: ai_client._gemini_cli_adapter.binary_path = value elif action == "click": item = task.get("item") user_data = task.get("user_data") if item == "btn_project_new_automated": self._cb_new_project_automated(user_data) elif item in self._clickable_actions: # Check if it's a method that accepts user_data import inspect func = self._clickable_actions[item] try: sig = inspect.signature(func) if 'user_data' in sig.parameters: func(user_data=user_data) else: func() except Exception: func() elif action == "select_list_item": item = task.get("listbox", task.get("item")) value = task.get("item_value", task.get("value")) if item == "disc_listbox": self._switch_discussion(value) elif task.get("type") == "ask": self._pending_ask_dialog = True self._ask_request_id = task.get("request_id") self._ask_tool_data = task.get("data", {}) elif action == "clear_ask": if self._ask_request_id == task.get("request_id"): self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None elif action == "custom_callback": cb = task.get("callback") args = task.get("args", []) if callable(cb): try: cb(*args) except Exception as e: print(f"Error in direct custom callback: {e}") elif cb in self._predefined_callbacks: self._predefined_callbacks[cb](*args) elif action == "mma_step_approval": dlg = MMAApprovalDialog(task.get("ticket_id"), task.get("payload")) self._pending_mma_approval = task if "dialog_container" in task: task["dialog_container"][0] = dlg elif action == "mma_spawn_approval": dlg = MMASpawnApprovalDialog( task.get("ticket_id"), task.get("role"), task.get("prompt"), task.get("context_md") ) self._pending_mma_spawn = task self._mma_spawn_prompt = task.get("prompt", "") self._mma_spawn_context = task.get("context_md", "") self._mma_spawn_open = True self._mma_spawn_edit_mode = False if "dialog_container" in task: task["dialog_container"][0] = dlg except Exception as e: print(f"Error executing GUI task: {e}") def _handle_approve_script(self) -> None: """Logic for approving a pending script via API hooks.""" print("[DEBUG] _handle_approve_script called") with self._pending_dialog_lock: if self._pending_dialog: print(f"[DEBUG] Approving dialog for: {self._pending_dialog._script[:50]}...") with self._pending_dialog._condition: self._pending_dialog._approved = True self._pending_dialog._done = True self._pending_dialog._condition.notify_all() self._pending_dialog = None else: print("[DEBUG] No pending dialog to approve") def _handle_reject_script(self) -> None: """Logic for rejecting a pending script via API hooks.""" print("[DEBUG] _handle_reject_script called") with self._pending_dialog_lock: if self._pending_dialog: print(f"[DEBUG] Rejecting dialog for: {self._pending_dialog._script[:50]}...") with self._pending_dialog._condition: self._pending_dialog._approved = False self._pending_dialog._done = True self._pending_dialog._condition.notify_all() self._pending_dialog = None else: print("[DEBUG] No pending dialog to reject") def _handle_mma_respond(self, approved: bool, payload: str = None, abort: bool = False, prompt: str = None, context_md: str = None) -> None: if self._pending_mma_approval: dlg = self._pending_mma_approval.get("dialog_container", [None])[0] if dlg: with dlg._condition: dlg._approved = approved if payload is not None: dlg._payload = payload dlg._done = True dlg._condition.notify_all() self._pending_mma_approval = None if self._pending_mma_spawn: dlg = self._pending_mma_spawn.get("dialog_container", [None])[0] if dlg: with dlg._condition: dlg._approved = approved dlg._abort = abort if prompt is not None: dlg._prompt = prompt if context_md is not None: dlg._context_md = context_md dlg._done = True dlg._condition.notify_all() self._pending_mma_spawn = None def _handle_approve_ask(self) -> None: """Responds with approval for a pending /api/ask request.""" if not self._ask_request_id: return request_id = self._ask_request_id def do_post(): try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": True}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reject_ask(self) -> None: """Responds with rejection for a pending /api/ask request.""" if not self._ask_request_id: return request_id = self._ask_request_id def do_post(): try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": False}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reset_session(self) -> None: """Logic for resetting the AI session.""" ai_client.reset_session() ai_client.clear_comms_log() self._tool_log.clear() self._comms_log.clear() self.disc_entries.clear() # Clear history in project dict too disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if self.active_discussion in discussions: discussions[self.active_discussion]["history"] = [] self.ai_status = "session reset" self.ai_response = "" self.ui_ai_input = "" def _handle_md_only(self) -> None: """Logic for the 'MD Only' action.""" try: md, path, *_ = self._do_generate() self.last_md = md self.last_md_path = path self.ai_status = f"md written: {path.name}" # Refresh token budget metrics with CURRENT md self._refresh_api_metrics({}, md_content=md) except Exception as e: self.ai_status = f"error: {e}" def _handle_generate_send(self) -> None: """Logic for the 'Gen + Send' action.""" try: md, path, file_items, stable_md, disc_text = self._do_generate() self.last_md = md self.last_md_path = path self.last_file_items = file_items except Exception as e: self.ai_status = f"generate error: {e}" return self.ai_status = "sending..." user_msg = self.ui_ai_input base_dir = self.ui_files_base_dir # Prepare event payload event_payload = events.UserRequestEvent( prompt=user_msg, stable_md=stable_md, file_items=file_items, disc_text=disc_text, base_dir=base_dir ) # Push to async queue asyncio.run_coroutine_threadsafe( self.event_queue.put("user_request", event_payload), self._loop ) def _run_event_loop(self) -> None: """Runs the internal asyncio event loop.""" asyncio.set_event_loop(self._loop) self._loop.create_task(self._process_event_queue()) self._loop.run_forever() def shutdown(self) -> None: """Cleanly shuts down the app's background tasks.""" if self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) if self._loop_thread.is_alive(): self._loop_thread.join(timeout=2.0) # Join other threads if they exist if self.send_thread and self.send_thread.is_alive(): self.send_thread.join(timeout=1.0) if self.models_thread and self.models_thread.is_alive(): self.models_thread.join(timeout=1.0) async def _process_event_queue(self) -> None: """Listens for and processes events from the AsyncEventQueue.""" while True: event_name, payload = await self.event_queue.get() if event_name == "user_request": # Handle the request (simulating what was previously in do_send thread) self._handle_request_event(payload) elif event_name == "response": # Handle AI response event with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": payload }) elif event_name == "mma_state_update": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "mma_state_update", "payload": payload }) def _handle_request_event(self, event: events.UserRequestEvent) -> None: """Processes a UserRequestEvent by calling the AI client.""" if self.ui_auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "User", "content": event.prompt, "collapsed": False, "ts": project_manager.now_ts() }) csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit) ai_client.set_agent_tools(self.ui_agent_tools) try: resp = ai_client.send(event.stable_md, event.prompt, event.base_dir, event.file_items, event.disc_text) # Emit response event asyncio.run_coroutine_threadsafe( self.event_queue.put("response", {"text": resp, "status": "done"}), self._loop ) except ProviderError as e: asyncio.run_coroutine_threadsafe( self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}), self._loop ) except Exception as e: asyncio.run_coroutine_threadsafe( self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}), self._loop ) def _test_callback_func_write_to_file(self, data: str) -> None: """A dummy function that a custom_callback would execute for testing.""" # Note: This file path is relative to where the test is run. # This is for testing purposes only. with open("temp_callback_output.txt", "w") as f: f.write(data) def _recalculate_session_usage(self) -> None: usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0} for entry in ai_client.get_comms_log(): if entry.get("kind") == "response" and "usage" in entry.get("payload", {}): u = entry["payload"]["usage"] for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: if k in usage: usage[k] += u.get(k, 0) or 0 self.session_usage = usage def _refresh_api_metrics(self, payload: dict, md_content: str | None = None) -> None: if "latency" in payload: self.session_usage["last_latency"] = payload["latency"] self._recalculate_session_usage() def fetch_stats(): try: stats = ai_client.get_history_bleed_stats(md_content=md_content or self.last_md) self._token_budget_pct = stats.get("percentage", 0.0) / 100.0 self._token_budget_current = stats.get("current", 0) self._token_budget_limit = stats.get("limit", 0) except Exception: pass threading.Thread(target=fetch_stats, daemon=True).start() cache_stats = payload.get("cache_stats") if cache_stats: count = cache_stats.get("cache_count", 0) size_bytes = cache_stats.get("total_size_bytes", 0) self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)" def cb_load_prior_log(self) -> None: root = hide_tk_root() path = filedialog.askopenfilename( title="Load Session Log", initialdir="logs", filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")] ) root.destroy() if not path: return entries = [] try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: try: entries.append(json.loads(line)) except json.JSONDecodeError: continue except Exception as e: self.ai_status = f"log load error: {e}" return self.prior_session_entries = entries self.is_viewing_prior_session = True self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)" def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str | None: print(f"[DEBUG] _confirm_and_run triggered for script length: {len(script)}") dialog = ConfirmDialog(script, base_dir) is_headless = "--headless" in sys.argv if is_headless: with self._pending_dialog_lock: self._pending_actions[dialog._uid] = dialog print(f"[PENDING_ACTION] Created action {dialog._uid}") else: with self._pending_dialog_lock: self._pending_dialog = dialog # Notify API hook subscribers if self.test_hooks_enabled and hasattr(self, '_api_event_queue'): print("[DEBUG] Pushing script_confirmation_required event to queue") with self._api_event_queue_lock: self._api_event_queue.append({ "type": "script_confirmation_required", "action_id": dialog._uid, "script": str(script), "base_dir": str(base_dir), "ts": time.time() }) approved, final_script = dialog.wait() if is_headless: with self._pending_dialog_lock: if dialog._uid in self._pending_actions: del self._pending_actions[dialog._uid] print(f"[DEBUG] _confirm_and_run result: approved={approved}") if not approved: self._append_tool_log(final_script, "REJECTED by user") return None self.ai_status = "running powershell..." print(f"[DEBUG] Running powershell in {base_dir}") output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback) self._append_tool_log(final_script, output) self.ai_status = "powershell done, awaiting AI..." return output def resolve_pending_action(self, action_id: str, approved: bool) -> bool: """Resolves a pending PowerShell script confirmation by its ID. Args: action_id: The unique identifier for the pending action. approved: True if the script should be executed, False otherwise. Returns: bool: True if the action was found and resolved, False otherwise. """ with self._pending_dialog_lock: if action_id in self._pending_actions: dialog = self._pending_actions[action_id] with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True elif self._pending_dialog and self._pending_dialog._uid == action_id: dialog = self._pending_dialog with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True return False def _append_tool_log(self, script: str, result: str) -> None: self._tool_log.append((script, result, time.time())) self.ui_last_script_text = script self.ui_last_script_output = result self._trigger_script_blink = True self.show_script_output = True if self.ui_auto_scroll_tool_calls: self._scroll_tool_calls_to_bottom = True def _flush_to_project(self) -> None: proj = self.project proj.setdefault("output", {})["output_dir"] = self.ui_output_dir proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir proj["files"]["paths"] = self.files proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir proj["screenshots"]["paths"] = self.screenshots proj.setdefault("project", {}) proj["project"]["git_dir"] = self.ui_project_git_dir proj["project"]["system_prompt"] = self.ui_project_system_prompt proj["project"]["main_context"] = self.ui_project_main_context proj["project"]["word_wrap"] = self.ui_word_wrap proj["project"]["summary_only"] = self.ui_summary_only proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path proj.setdefault("agent", {}).setdefault("tools", {}) for t_name in AGENT_TOOL_NAMES: proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True) self._flush_disc_entries_to_project() disc_sec = proj.setdefault("discussion", {}) disc_sec["roles"] = self.disc_roles disc_sec["active"] = self.active_discussion disc_sec["auto_add"] = self.ui_auto_add_history # Save MMA State mma_sec = proj.setdefault("mma", {}) mma_sec["epic"] = self.ui_epic_input if self.active_track: # We only persist the basic metadata if full serialization is too complex # For now, let's try full serialization via asdict from dataclasses import asdict mma_sec["active_track"] = asdict(self.active_track) else: mma_sec["active_track"] = None def _flush_to_config(self) -> None: self.config["ai"] = { "provider": self.current_provider, "model": self.current_model, "temperature": self.temperature, "max_tokens": self.max_tokens, "history_trunc_limit": self.history_trunc_limit, } self.config["ai"]["system_prompt"] = self.ui_global_system_prompt self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path} self.config["gui"] = {"show_windows": self.show_windows} theme.save_to_config(self.config) def _do_generate(self) -> tuple[str, Path, list, str, str]: """Returns (full_md, output_path, file_items, stable_md, discussion_text).""" self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) track_id = self.active_track.id if self.active_track else None flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id) full_md, path, file_items = aggregate.run(flat) # Build stable markdown (no history) for Gemini caching screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", ".")) screenshots = flat.get("screenshots", {}).get("paths", []) summary_only = flat.get("project", {}).get("summary_only", False) stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only) # Build discussion history text separately history = flat.get("discussion", {}).get("history", []) discussion_text = aggregate.build_discussion_text(history) return full_md, path, file_items, stable_md, discussion_text def _fetch_models(self, provider: str) -> None: self.ai_status = "fetching models..." def do_fetch(): try: models = ai_client.list_models(provider) self.available_models = models if self.current_model not in models and models: self.current_model = models[0] ai_client.set_provider(self.current_provider, self.current_model) self.ai_status = f"models loaded: {len(models)}" except Exception as e: self.ai_status = f"model fetch error: {e}" self.models_thread = threading.Thread(target=do_fetch, daemon=True) self.models_thread.start() # ---------------------------------------------------------------- helpers def _render_text_viewer(self, label: str, content: str) -> None: if imgui.button("[+]##" + str(id(content))): self.show_text_viewer = True self.text_viewer_title = label self.text_viewer_content = content def _render_heavy_text(self, label: str, content: str) -> None: imgui.text_colored(C_LBL, f"{label}:") imgui.same_line() if imgui.button("[+]##" + label): self.show_text_viewer = True self.text_viewer_title = label self.text_viewer_content = content if len(content) > COMMS_CLAMP_CHARS: if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(content) imgui.pop_text_wrap_pos() else: if imgui.begin_child(f"heavy_text_child_{label}", imgui.ImVec2(0, 80), True): imgui.input_text_multiline(f"##{label}_input", content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end_child() else: if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(content if content else "(empty)") imgui.pop_text_wrap_pos() else: imgui.text(content if content else "(empty)") # ---------------------------------------------------------------- gui def _show_menus(self) -> None: if imgui.begin_menu("Windows"): for w in self.show_windows.keys(): _, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w]) imgui.end_menu() if imgui.begin_menu("Project"): if imgui.menu_item("Save All", "", False)[0]: self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self.ai_status = "config saved" if imgui.menu_item("Reset Session", "", False)[0]: ai_client.reset_session() ai_client.clear_comms_log() self._tool_log.clear() self._comms_log.clear() self.ai_status = "session reset" self.ai_response = "" if imgui.menu_item("Generate MD Only", "", False)[0]: try: md, path, *_ = self._do_generate() self.last_md = md self.last_md_path = path self.ai_status = f"md written: {path.name}" except Exception as e: self.ai_status = f"error: {e}" imgui.end_menu() def _gui_func(self) -> None: try: self.perf_monitor.start_frame() # Process GUI task queue self._process_pending_gui_tasks() self._render_track_proposal_modal() # Auto-save (every 60s) now = time.time() if now - self._last_autosave >= self._autosave_interval: self._last_autosave = now try: self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) except Exception: pass # silent — don't disrupt the GUI loop # Sync pending comms with self._pending_comms_lock: if self._pending_comms and self.ui_auto_scroll_comms: self._scroll_comms_to_bottom = True for c in self._pending_comms: self._comms_log.append(c) self._pending_comms.clear() with self._pending_tool_calls_lock: if self._pending_tool_calls and self.ui_auto_scroll_tool_calls: self._scroll_tool_calls_to_bottom = True for tc in self._pending_tool_calls: self._tool_log.append(tc) self._pending_tool_calls.clear() # Sync pending history adds with self._pending_history_adds_lock: if self._pending_history_adds: self._scroll_disc_to_bottom = True for item in self._pending_history_adds: if item["role"] not in self.disc_roles: self.disc_roles.append(item["role"]) self.disc_entries.append(item) self._pending_history_adds.clear() # ---- Menubar if imgui.begin_main_menu_bar(): if imgui.begin_menu("manual slop"): if imgui.menu_item("Quit", "Ctrl+Q", False)[0]: self.should_quit = True imgui.end_menu() if imgui.begin_menu("View"): for name in self.show_windows: _, self.show_windows[name] = imgui.menu_item(name, "", self.show_windows[name]) imgui.end_menu() if imgui.begin_menu("Project"): if imgui.menu_item("Save All", "Ctrl+S", False)[0]: self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self.ai_status = "config saved" if imgui.menu_item("Generate MD Only", "", False)[0]: self._handle_md_only() if imgui.menu_item("Reset Session", "", False)[0]: self._handle_reset_session() imgui.end_menu() imgui.end_main_menu_bar() # --- Hubs --- if self.show_windows.get("Context Hub", False): exp, self.show_windows["Context Hub"] = imgui.begin("Context Hub", self.show_windows["Context Hub"]) if exp: self._render_projects_panel() imgui.end() if self.show_windows.get("Files & Media", False): exp, self.show_windows["Files & Media"] = imgui.begin("Files & Media", self.show_windows["Files & Media"]) if exp: if imgui.collapsing_header("Files"): self._render_files_panel() if imgui.collapsing_header("Screenshots"): self._render_screenshots_panel() imgui.end() if self.show_windows.get("AI Settings", False): exp, self.show_windows["AI Settings"] = imgui.begin("AI Settings", self.show_windows["AI Settings"]) if exp: if imgui.collapsing_header("Provider & Model"): self._render_provider_panel() if imgui.collapsing_header("System Prompts"): self._render_system_prompts_panel() imgui.end() if self.show_windows.get("MMA Dashboard", False): exp, self.show_windows["MMA Dashboard"] = imgui.begin("MMA Dashboard", self.show_windows["MMA Dashboard"]) if exp: self._render_mma_dashboard() imgui.end() if self.show_windows.get("Theme", False): self._render_theme_panel() if self.show_windows.get("Discussion Hub", False): exp, self.show_windows["Discussion Hub"] = imgui.begin("Discussion Hub", self.show_windows["Discussion Hub"]) if exp: # Top part for the history if imgui.begin_child("HistoryChild", size=(0, -200)): self._render_discussion_panel() imgui.end_child() # Bottom part with tabs for message and response if imgui.begin_tab_bar("MessageResponseTabs"): if imgui.begin_tab_item("Message")[0]: self._render_message_panel() imgui.end_tab_item() if imgui.begin_tab_item("Response")[0]: self._render_response_panel() imgui.end_tab_item() imgui.end_tab_bar() imgui.end() if self.show_windows.get("Operations Hub", False): exp, self.show_windows["Operations Hub"] = imgui.begin("Operations Hub", self.show_windows["Operations Hub"]) if exp: if imgui.begin_tab_bar("OperationsTabs"): if imgui.begin_tab_item("Tool Calls")[0]: self._render_tool_calls_panel() imgui.end_tab_item() if imgui.begin_tab_item("Comms History")[0]: self._render_comms_history_panel() imgui.end_tab_item() imgui.end_tab_bar() imgui.end() if self.show_windows.get("Log Management", False): self._render_log_management() if self.show_windows["Diagnostics"]: exp, self.show_windows["Diagnostics"] = imgui.begin("Diagnostics", self.show_windows["Diagnostics"]) if exp: now = time.time() if now - self._perf_last_update >= 0.5: self._perf_last_update = now metrics = self.perf_monitor.get_metrics() self.perf_history["frame_time"].pop(0) self.perf_history["frame_time"].append(metrics.get("last_frame_time_ms", 0.0)) self.perf_history["fps"].pop(0) self.perf_history["fps"].append(metrics.get("fps", 0.0)) self.perf_history["cpu"].pop(0) self.perf_history["cpu"].append(metrics.get("cpu_percent", 0.0)) self.perf_history["input_lag"].pop(0) self.perf_history["input_lag"].append(metrics.get("input_lag_ms", 0.0)) metrics = self.perf_monitor.get_metrics() imgui.text("Performance Telemetry") imgui.separator() if imgui.begin_table("perf_table", 2, imgui.TableFlags_.borders_inner_h): imgui.table_setup_column("Metric") imgui.table_setup_column("Value") imgui.table_headers_row() imgui.table_next_row() imgui.table_next_column() imgui.text("FPS") imgui.table_next_column() imgui.text(f"{metrics.get('fps', 0.0):.1f}") imgui.table_next_row() imgui.table_next_column() imgui.text("Frame Time (ms)") imgui.table_next_column() imgui.text(f"{metrics.get('last_frame_time_ms', 0.0):.2f}") imgui.table_next_row() imgui.table_next_column() imgui.text("CPU %") imgui.table_next_column() imgui.text(f"{metrics.get('cpu_percent', 0.0):.1f}") imgui.table_next_row() imgui.table_next_column() imgui.text("Input Lag (ms)") imgui.table_next_column() imgui.text(f"{metrics.get('input_lag_ms', 0.0):.1f}") imgui.end_table() imgui.separator() imgui.text("Frame Time (ms)") imgui.plot_lines("##ft_plot", np.array(self.perf_history["frame_time"], dtype=np.float32), overlay_text="frame_time", graph_size=imgui.ImVec2(-1, 60)) imgui.text("CPU %") imgui.plot_lines("##cpu_plot", np.array(self.perf_history["cpu"], dtype=np.float32), overlay_text="cpu", graph_size=imgui.ImVec2(-1, 60)) imgui.end() self.perf_monitor.end_frame() # ---- Modals / Popups with self._pending_dialog_lock: dlg = self._pending_dialog if dlg: if not self._pending_dialog_open: imgui.open_popup("Approve PowerShell Command") self._pending_dialog_open = True else: self._pending_dialog_open = False if imgui.begin_popup_modal("Approve PowerShell Command", None, imgui.WindowFlags_.always_auto_resize)[0]: if not dlg: imgui.close_current_popup() else: imgui.text("The AI wants to run the following PowerShell script:") imgui.text_colored(vec4(200, 200, 100), f"base_dir: {dlg._base_dir}") imgui.separator() # Checkbox to toggle full preview inside modal _, self.show_text_viewer = imgui.checkbox("Show Full Preview", self.show_text_viewer) if self.show_text_viewer: imgui.begin_child("preview_child", imgui.ImVec2(600, 300), True) imgui.text_unformatted(dlg._script) imgui.end_child() else: ch, dlg._script = imgui.input_text_multiline("##confirm_script", dlg._script, imgui.ImVec2(-1, 200)) imgui.separator() if imgui.button("Approve & Run", imgui.ImVec2(120, 0)): with dlg._condition: dlg._approved = True dlg._done = True dlg._condition.notify_all() with self._pending_dialog_lock: self._pending_dialog = None imgui.close_current_popup() imgui.same_line() if imgui.button("Reject", imgui.ImVec2(120, 0)): with dlg._condition: dlg._approved = False dlg._done = True dlg._condition.notify_all() with self._pending_dialog_lock: self._pending_dialog = None imgui.close_current_popup() imgui.end_popup() if self._pending_ask_dialog: if not self._ask_dialog_open: imgui.open_popup("Approve Tool Execution") self._ask_dialog_open = True else: self._ask_dialog_open = False if imgui.begin_popup_modal("Approve Tool Execution", None, imgui.WindowFlags_.always_auto_resize)[0]: if not self._pending_ask_dialog: imgui.close_current_popup() else: tool_name = self._ask_tool_data.get("tool", "unknown") tool_args = self._ask_tool_data.get("args", {}) imgui.text("The AI wants to execute a tool:") imgui.text_colored(vec4(200, 200, 100), f"Tool: {tool_name}") imgui.separator() imgui.text("Arguments:") imgui.begin_child("ask_args_child", imgui.ImVec2(400, 200), True) imgui.text_unformatted(json.dumps(tool_args, indent=2)) imgui.end_child() imgui.separator() if imgui.button("Approve", imgui.ImVec2(120, 0)): self._handle_approve_ask() imgui.close_current_popup() imgui.same_line() if imgui.button("Deny", imgui.ImVec2(120, 0)): self._handle_reject_ask() imgui.close_current_popup() imgui.end_popup() # MMA Step Approval Modal if self._pending_mma_approval: if not self._mma_approval_open: imgui.open_popup("MMA Step Approval") self._mma_approval_open = True self._mma_approval_edit_mode = False self._mma_approval_payload = self._pending_mma_approval.get("payload", "") else: self._mma_approval_open = False if imgui.begin_popup_modal("MMA Step Approval", None, imgui.WindowFlags_.always_auto_resize)[0]: if not self._pending_mma_approval: imgui.close_current_popup() else: ticket_id = self._pending_mma_approval.get("ticket_id", "??") imgui.text(f"Ticket {ticket_id} is waiting for tool execution approval.") imgui.separator() if self._mma_approval_edit_mode: imgui.text("Edit Raw Payload (Manual Memory Mutation):") _, self._mma_approval_payload = imgui.input_text_multiline("##mma_payload", self._mma_approval_payload, imgui.ImVec2(600, 400)) else: imgui.text("Proposed Tool Call:") imgui.begin_child("mma_preview", imgui.ImVec2(600, 300), True) imgui.text_unformatted(str(self._pending_mma_approval.get("payload", ""))) imgui.end_child() imgui.separator() if imgui.button("Approve", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=True, payload=self._mma_approval_payload) imgui.close_current_popup() imgui.same_line() if imgui.button("Edit Payload" if not self._mma_approval_edit_mode else "Show Original", imgui.ImVec2(120, 0)): self._mma_approval_edit_mode = not self._mma_approval_edit_mode imgui.same_line() if imgui.button("Abort Ticket", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=False) imgui.close_current_popup() imgui.end_popup() # MMA Spawn Approval Modal if self._pending_mma_spawn: if not self._mma_spawn_open: imgui.open_popup("MMA Spawn Approval") self._mma_spawn_open = True self._mma_spawn_edit_mode = False self._mma_spawn_prompt = self._pending_mma_spawn.get("prompt", "") self._mma_spawn_context = self._pending_mma_spawn.get("context_md", "") else: self._mma_spawn_open = False if imgui.begin_popup_modal("MMA Spawn Approval", None, imgui.WindowFlags_.always_auto_resize)[0]: if not self._pending_mma_spawn: imgui.close_current_popup() else: role = self._pending_mma_spawn.get("role", "??") ticket_id = self._pending_mma_spawn.get("ticket_id", "??") imgui.text(f"Spawning {role} for Ticket {ticket_id}") imgui.separator() if self._mma_spawn_edit_mode: imgui.text("Edit Prompt:") _, self._mma_spawn_prompt = imgui.input_text_multiline("##spawn_prompt", self._mma_spawn_prompt, imgui.ImVec2(800, 200)) imgui.text("Edit Context MD:") _, self._mma_spawn_context = imgui.input_text_multiline("##spawn_context", self._mma_spawn_context, imgui.ImVec2(800, 300)) else: imgui.text("Proposed Prompt:") imgui.begin_child("spawn_prompt_preview", imgui.ImVec2(800, 150), True) imgui.text_unformatted(self._mma_spawn_prompt) imgui.end_child() imgui.text("Proposed Context MD:") imgui.begin_child("spawn_context_preview", imgui.ImVec2(800, 250), True) imgui.text_unformatted(self._mma_spawn_context) imgui.end_child() imgui.separator() if imgui.button("Approve", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context) imgui.close_current_popup() imgui.same_line() if imgui.button("Edit Mode" if not self._mma_spawn_edit_mode else "Preview Mode", imgui.ImVec2(120, 0)): self._mma_spawn_edit_mode = not self._mma_spawn_edit_mode imgui.same_line() if imgui.button("Abort", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=False, abort=True) imgui.close_current_popup() imgui.end_popup() if self.show_script_output: if self._trigger_script_blink: self._trigger_script_blink = False self._is_script_blinking = True self._script_blink_start_time = time.time() try: imgui.set_window_focus("Last Script Output") except Exception: pass if self._is_script_blinking: elapsed = time.time() - self._script_blink_start_time if elapsed > 1.5: self._is_script_blinking = False else: val = math.sin(elapsed * 8 * math.pi) alpha = 60/255 if val > 0 else 0 imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 100, 255, alpha)) imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 100, 255, alpha)) imgui.set_next_window_size(imgui.ImVec2(800, 600), imgui.Cond_.first_use_ever) expanded, self.show_script_output = imgui.begin("Last Script Output", self.show_script_output) if expanded: imgui.text("Script:") imgui.same_line() self._render_text_viewer("Last Script", self.ui_last_script_text) if self.ui_word_wrap: imgui.begin_child("lso_s_wrap", imgui.ImVec2(-1, 200), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.ui_last_script_text) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##lso_s", self.ui_last_script_text, imgui.ImVec2(-1, 200), imgui.InputTextFlags_.read_only) imgui.separator() imgui.text("Output:") imgui.same_line() self._render_text_viewer("Last Output", self.ui_last_script_output) if self.ui_word_wrap: imgui.begin_child("lso_o_wrap", imgui.ImVec2(-1, -1), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.ui_last_script_output) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##lso_o", self.ui_last_script_output, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) if self._is_script_blinking: imgui.pop_style_color(2) imgui.end() if self.show_text_viewer: imgui.set_next_window_size(imgui.ImVec2(900, 700), imgui.Cond_.first_use_ever) expanded, self.show_text_viewer = imgui.begin(f"Text Viewer - {self.text_viewer_title}", self.show_text_viewer) if expanded: if self.ui_word_wrap: imgui.begin_child("tv_wrap", imgui.ImVec2(-1, -1), False) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.text_viewer_content) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##tv_c", self.text_viewer_content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end() except Exception as e: print(f"ERROR in _gui_func: {e}") import traceback traceback.print_exc() def _render_projects_panel(self) -> None: proj_name = self.project.get("project", {}).get("name", Path(self.active_project_path).stem) imgui.text_colored(C_IN, f"Active: {proj_name}") imgui.separator() imgui.text("Git Directory") ch, self.ui_project_git_dir = imgui.input_text("##git_dir", self.ui_project_git_dir) imgui.same_line() if imgui.button("Browse##git"): r = hide_tk_root() d = filedialog.askdirectory(title="Select Git Directory") r.destroy() if d: self.ui_project_git_dir = d imgui.separator() imgui.text("Main Context File") ch, self.ui_project_main_context = imgui.input_text("##main_ctx", self.ui_project_main_context) imgui.same_line() if imgui.button("Browse##ctx"): r = hide_tk_root() p = filedialog.askopenfilename(title="Select Main Context File") r.destroy() if p: self.ui_project_main_context = p imgui.separator() imgui.text("Output Dir") ch, self.ui_output_dir = imgui.input_text("##out_dir", self.ui_output_dir) imgui.same_line() if imgui.button("Browse##out"): r = hide_tk_root() d = filedialog.askdirectory(title="Select Output Dir") r.destroy() if d: self.ui_output_dir = d imgui.separator() imgui.text("Project Files") imgui.begin_child("proj_files", imgui.ImVec2(0, 150), True) for i, pp in enumerate(self.project_paths): is_active = (pp == self.active_project_path) if imgui.button(f"x##p{i}"): removed = self.project_paths.pop(i) if removed == self.active_project_path and self.project_paths: self._switch_project(self.project_paths[0]) break imgui.same_line() marker = " *" if is_active else "" if is_active: imgui.push_style_color(imgui.Col_.text, C_IN) if imgui.button(f"{Path(pp).stem}{marker}##ps{i}"): self._switch_project(pp) if is_active: imgui.pop_style_color() imgui.same_line() imgui.text_colored(C_LBL, pp) imgui.end_child() if imgui.button("Add Project"): r = hide_tk_root() p = filedialog.askopenfilename( title="Select Project .toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")], ) r.destroy() if p and p not in self.project_paths: self.project_paths.append(p) imgui.same_line() if imgui.button("New Project"): r = hide_tk_root() p = filedialog.asksaveasfilename(title="Create New Project .toml", defaultextension=".toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")]) r.destroy() if p: name = Path(p).stem proj = project_manager.default_project(name) project_manager.save_project(proj, p) if p not in self.project_paths: self.project_paths.append(p) self._switch_project(p) imgui.same_line() if imgui.button("Save All"): self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self.ai_status = "config saved" ch, self.ui_word_wrap = imgui.checkbox("Word-Wrap (Read-only panels)", self.ui_word_wrap) ch, self.ui_summary_only = imgui.checkbox("Summary Only (send file structure, not full content)", self.ui_summary_only) ch, self.ui_auto_scroll_comms = imgui.checkbox("Auto-scroll Comms History", self.ui_auto_scroll_comms) ch, self.ui_auto_scroll_tool_calls = imgui.checkbox("Auto-scroll Tool History", self.ui_auto_scroll_tool_calls) if imgui.collapsing_header("Agent Tools"): for t_name in AGENT_TOOL_NAMES: val = self.ui_agent_tools.get(t_name, True) ch, val = imgui.checkbox(f"Enable {t_name}", val) if ch: self.ui_agent_tools[t_name] = val imgui.separator() imgui.text_colored(C_LBL, 'MMA Orchestration') _, self.ui_epic_input = imgui.input_text_multiline('##epic_input', self.ui_epic_input, imgui.ImVec2(-1, 80)) if imgui.button('Plan Epic (Tier 1)', imgui.ImVec2(-1, 0)): self._cb_plan_epic() def _cb_plan_epic(self) -> None: def _bg_task(): try: self.ai_status = "Planning Epic (Tier 1)..." history = orchestrator_pm.get_track_history_summary() proj = project_manager.load_project(self.active_project_path) flat = project_manager.flat_config(proj) file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", [])) tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history) with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": { "text": json.dumps(tracks, indent=2), "stream_id": "Tier 1", "status": "Epic tracks generated." } }) self._pending_gui_tasks.append({ "action": "show_track_proposal", "payload": tracks }) except Exception as e: self.ai_status = f"Epic plan error: {e}" print(f"ERROR in _cb_plan_epic background task: {e}") threading.Thread(target=_bg_task, daemon=True).start() def _cb_accept_tracks(self) -> None: def _bg_task(): for track_data in self.proposed_tracks: self._start_track_logic(track_data) self.ai_status = "Tracks accepted and execution started." threading.Thread(target=_bg_task, daemon=True).start() def _cb_start_track(self, user_data: Any = None) -> None: idx = 0 if isinstance(user_data, int): idx = user_data elif isinstance(user_data, dict): idx = user_data.get("index", 0) if 0 <= idx < len(self.proposed_tracks): track_data = self.proposed_tracks[idx] title = track_data.get("title") or track_data.get("goal", "Untitled Track") threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start() self.ai_status = f"Track '{title}' started." def _start_track_logic(self, track_data: dict[str, Any]) -> None: try: goal = track_data.get("goal", "") title = track_data.get("title") or track_data.get("goal", "Untitled Track") self.ai_status = f"Phase 2: Generating tickets for {title}..." # 1. Get skeletons for context parser = ASTParser(language="python") skeletons = "" for i, file_path in enumerate(self.files): try: self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..." abs_path = Path(self.ui_files_base_dir) / file_path if abs_path.exists() and abs_path.suffix == ".py": with open(abs_path, "r", encoding="utf-8") as f: code = f.read() skeletons += f"\nFile: {file_path}\n{parser.get_skeleton(code)}\n" except Exception as e: print(f"Error parsing skeleton for {file_path}: {e}") self.ai_status = "Phase 2: Calling Tech Lead..." raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons) if not raw_tickets: self.ai_status = f"Error: No tickets generated for track: {title}" print(f"Warning: No tickets generated for track: {title}") return self.ai_status = "Phase 2: Sorting tickets..." try: sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets) except ValueError as e: print(f"Dependency error in track '{title}': {e}") sorted_tickets_data = raw_tickets # 3. Create Track and Ticket objects tickets = [] for t_data in sorted_tickets_data: ticket = Ticket( id=t_data["id"], description=t_data.get("description") or t_data.get("goal", "No description"), status=t_data.get("status", "todo"), assigned_to=t_data.get("assigned_to", "unassigned"), depends_on=t_data.get("depends_on", []), step_mode=t_data.get("step_mode", False) ) tickets.append(ticket) track_id = f"track_{uuid.uuid4().hex[:8]}" track = Track(id=track_id, description=title, tickets=tickets) # Initialize track state in the filesystem from models import TrackState, Metadata from datetime import datetime now = datetime.now() meta = Metadata(id=track_id, name=title, status="todo", created_at=now, updated_at=now) state = TrackState(metadata=meta, discussion=[], tasks=tickets) project_manager.save_track_state(track_id, state, self.ui_files_base_dir) # 4. Initialize ConductorEngine and run loop engine = multi_agent_conductor.ConductorEngine(track, self.event_queue) # Use current full markdown context for the track execution track_id_param = track.id flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param) full_md, _, _ = aggregate.run(flat) # Schedule the coroutine on the internal event loop asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop) except Exception as e: self.ai_status = f"Track start error: {e}" print(f"ERROR in _start_track_logic: {e}") def _render_track_proposal_modal(self) -> None: if self._show_track_proposal_modal: imgui.open_popup("Track Proposal") if imgui.begin_popup_modal("Track Proposal", True, imgui.WindowFlags_.always_auto_resize)[0]: imgui.text_colored(C_IN, "Proposed Implementation Tracks") imgui.separator() if not self.proposed_tracks: imgui.text("No tracks generated.") else: for idx, track in enumerate(self.proposed_tracks): imgui.text_colored(C_LBL, f"Track {idx+1}: {track.get('title', 'Untitled')}") imgui.text_wrapped(f"Goal: {track.get('goal', 'N/A')}") if imgui.button(f"Start This Track##{idx}"): self._cb_start_track(idx) imgui.separator() if imgui.button("Accept", imgui.ImVec2(120, 0)): self._cb_accept_tracks() self._show_track_proposal_modal = False imgui.close_current_popup() imgui.same_line() if imgui.button("Cancel", imgui.ImVec2(120, 0)): self._show_track_proposal_modal = False imgui.close_current_popup() imgui.end_popup() def _render_log_management(self) -> None: exp, self.show_windows["Log Management"] = imgui.begin("Log Management", self.show_windows["Log Management"]) if not exp: imgui.end() return registry = LogRegistry("logs/log_registry.toml") sessions = registry.data if imgui.begin_table("sessions_table", 7, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable): imgui.table_setup_column("Session ID") imgui.table_setup_column("Start Time") imgui.table_setup_column("Star") imgui.table_setup_column("Reason") imgui.table_setup_column("Size (KB)") imgui.table_setup_column("Msgs") imgui.table_setup_column("Actions") imgui.table_headers_row() for session_id, s_data in sessions.items(): imgui.table_next_row() imgui.table_next_column() imgui.text(session_id) imgui.table_next_column() imgui.text(s_data.get("start_time", "")) imgui.table_next_column() whitelisted = s_data.get("whitelisted", False) if whitelisted: imgui.text_colored(vec4(255, 215, 0), "YES") else: imgui.text("NO") metadata = s_data.get("metadata") or {} imgui.table_next_column() imgui.text(metadata.get("reason", "")) imgui.table_next_column() imgui.text(str(metadata.get("size_kb", ""))) imgui.table_next_column() imgui.text(str(metadata.get("message_count", ""))) imgui.table_next_column() if whitelisted: if imgui.button(f"Unstar##{session_id}"): registry.update_session_metadata( session_id, message_count=metadata.get("message_count"), errors=metadata.get("errors"), size_kb=metadata.get("size_kb"), whitelisted=False, reason=metadata.get("reason") ) else: if imgui.button(f"Star##{session_id}"): registry.update_session_metadata( session_id, message_count=metadata.get("message_count"), errors=metadata.get("errors"), size_kb=metadata.get("size_kb"), whitelisted=True, reason="Manually whitelisted" ) imgui.end_table() imgui.end() def _render_files_panel(self) -> None: imgui.text("Base Dir") ch, self.ui_files_base_dir = imgui.input_text("##f_base", self.ui_files_base_dir) imgui.same_line() if imgui.button("Browse##fb"): r = hide_tk_root() d = filedialog.askdirectory() r.destroy() if d: self.ui_files_base_dir = d imgui.separator() imgui.text("Paths") imgui.begin_child("f_paths", imgui.ImVec2(0, -40), True) for i, f in enumerate(self.files): if imgui.button(f"x##f{i}"): self.files.pop(i) break imgui.same_line() imgui.text(f) imgui.end_child() if imgui.button("Add File(s)"): r = hide_tk_root() paths = filedialog.askopenfilenames() r.destroy() for p in paths: if p not in self.files: self.files.append(p) imgui.same_line() if imgui.button("Add Wildcard"): r = hide_tk_root() d = filedialog.askdirectory() r.destroy() if d: self.files.append(str(Path(d) / "**" / "*")) def _render_screenshots_panel(self) -> None: imgui.text("Base Dir") ch, self.ui_shots_base_dir = imgui.input_text("##s_base", self.ui_shots_base_dir) imgui.same_line() if imgui.button("Browse##sb"): r = hide_tk_root() d = filedialog.askdirectory() r.destroy() if d: self.ui_shots_base_dir = d imgui.separator() imgui.text("Paths") imgui.begin_child("s_paths", imgui.ImVec2(0, -40), True) for i, s in enumerate(self.screenshots): if imgui.button(f"x##s{i}"): self.screenshots.pop(i) break imgui.same_line() imgui.text(s) imgui.end_child() if imgui.button("Add Screenshot(s)"): r = hide_tk_root() paths = filedialog.askopenfilenames( title="Select Screenshots", filetypes=[("Images", "*.png *.jpg *.jpeg *.gif *.bmp *.webp"), ("All", "*.*")], ) r.destroy() for p in paths: if p not in self.screenshots: self.screenshots.append(p) def _render_discussion_panel(self) -> None: # THINKING indicator is_thinking = self.ai_status in ["sending..."] if is_thinking: val = math.sin(time.time() * 10 * math.pi) alpha = 1.0 if val > 0 else 0.0 imgui.text_colored(imgui.ImVec4(1.0, 0.39, 0.39, alpha), "THINKING...") imgui.separator() # Prior session viewing mode if self.is_viewing_prior_session: imgui.push_style_color(imgui.Col_.child_bg, vec4(50, 40, 20)) imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION") imgui.same_line() if imgui.button("Exit Prior Session"): self.is_viewing_prior_session = False self.prior_session_entries.clear() imgui.separator() imgui.begin_child("prior_scroll", imgui.ImVec2(0, 0), False) for idx, entry in enumerate(self.prior_session_entries): imgui.push_id(f"prior_{idx}") kind = entry.get("kind", entry.get("type", "")) imgui.text_colored(C_LBL, f"#{idx+1}") imgui.same_line() ts = entry.get("ts", entry.get("timestamp", "")) if ts: imgui.text_colored(vec4(160, 160, 160), str(ts)) imgui.same_line() imgui.text_colored(C_KEY, str(kind)) payload = entry.get("payload", entry) text = payload.get("text", payload.get("message", payload.get("content", ""))) if text: preview = str(text).replace("\\n", " ")[:200] if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(preview) imgui.pop_text_wrap_pos() else: imgui.text(preview) imgui.separator() imgui.pop_id() imgui.end_child() imgui.pop_style_color() return if not self.is_viewing_prior_session and imgui.collapsing_header("Discussions", imgui.TreeNodeFlags_.default_open): names = self._get_discussion_names() if imgui.begin_combo("##disc_sel", self.active_discussion): for name in names: is_selected = (name == self.active_discussion) if imgui.selectable(name, is_selected)[0]: self._switch_discussion(name) if is_selected: imgui.set_item_default_focus() imgui.end_combo() disc_sec = self.project.get("discussion", {}) disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) git_commit = disc_data.get("git_commit", "") last_updated = disc_data.get("last_updated", "") imgui.text_colored(C_LBL, "commit:") imgui.same_line() imgui.text_colored(C_IN if git_commit else C_LBL, git_commit[:12] if git_commit else "(none)") imgui.same_line() if imgui.button("Update Commit"): git_dir = self.ui_project_git_dir if git_dir: cmt = project_manager.get_git_commit(git_dir) if cmt: disc_data["git_commit"] = cmt disc_data["last_updated"] = project_manager.now_ts() self.ai_status = f"commit: {cmt[:12]}" imgui.text_colored(C_LBL, "updated:") imgui.same_line() imgui.text_colored(C_SUB, last_updated if last_updated else "(never)") ch, self.ui_disc_new_name_input = imgui.input_text("##new_disc", self.ui_disc_new_name_input) imgui.same_line() if imgui.button("Create"): nm = self.ui_disc_new_name_input.strip() if nm: self._create_discussion(nm); self.ui_disc_new_name_input = "" imgui.same_line() if imgui.button("Rename"): nm = self.ui_disc_new_name_input.strip() if nm: self._rename_discussion(self.active_discussion, nm); self.ui_disc_new_name_input = "" imgui.same_line() if imgui.button("Delete"): self._delete_discussion(self.active_discussion) if not self.is_viewing_prior_session: imgui.separator() if imgui.button("+ Entry"): self.disc_entries.append({"role": self.disc_roles[0] if self.disc_roles else "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()}) imgui.same_line() if imgui.button("-All"): for e in self.disc_entries: e["collapsed"] = True imgui.same_line() if imgui.button("+All"): for e in self.disc_entries: e["collapsed"] = False imgui.same_line() if imgui.button("Clear All"): self.disc_entries.clear() imgui.same_line() if imgui.button("Save"): self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) self.ai_status = "discussion saved" ch, self.ui_auto_add_history = imgui.checkbox("Auto-add message & response to history", self.ui_auto_add_history) # Truncation controls imgui.text("Keep Pairs:") imgui.same_line() imgui.set_next_item_width(80) ch, self.ui_disc_truncate_pairs = imgui.input_int("##trunc_pairs", self.ui_disc_truncate_pairs, 1) if self.ui_disc_truncate_pairs < 1: self.ui_disc_truncate_pairs = 1 imgui.same_line() if imgui.button("Truncate"): self.disc_entries = truncate_entries(self.disc_entries, self.ui_disc_truncate_pairs) self.ai_status = f"history truncated to {self.ui_disc_truncate_pairs} pairs" imgui.separator() if imgui.collapsing_header("Roles"): imgui.begin_child("roles_scroll", imgui.ImVec2(0, 100), True) for i, r in enumerate(self.disc_roles): if imgui.button(f"x##r{i}"): self.disc_roles.pop(i) break imgui.same_line() imgui.text(r) imgui.end_child() ch, self.ui_disc_new_role_input = imgui.input_text("##new_role", self.ui_disc_new_role_input) imgui.same_line() if imgui.button("Add"): r = self.ui_disc_new_role_input.strip() if r and r not in self.disc_roles: self.disc_roles.append(r) self.ui_disc_new_role_input = "" imgui.separator() imgui.begin_child("disc_scroll", imgui.ImVec2(0, 0), False) clipper = imgui.ListClipper() clipper.begin(len(self.disc_entries)) while clipper.step(): for i in range(clipper.display_start, clipper.display_end): entry = self.disc_entries[i] imgui.push_id(str(i)) collapsed = entry.get("collapsed", False) read_mode = entry.get("read_mode", False) if imgui.button("+" if collapsed else "-"): entry["collapsed"] = not collapsed imgui.same_line() imgui.set_next_item_width(120) if imgui.begin_combo("##role", entry["role"]): for r in self.disc_roles: if imgui.selectable(r, r == entry["role"])[0]: entry["role"] = r imgui.end_combo() if not collapsed: imgui.same_line() if imgui.button("[Edit]" if read_mode else "[Read]"): entry["read_mode"] = not read_mode ts_str = entry.get("ts", "") if ts_str: imgui.same_line() imgui.text_colored(vec4(120, 120, 100), str(ts_str)) if collapsed: imgui.same_line() if imgui.button("Ins"): self.disc_entries.insert(i, {"role": "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()}) imgui.same_line() self._render_text_viewer(f"Entry #{i+1}", entry["content"]) imgui.same_line() if imgui.button("Del"): self.disc_entries.pop(i) imgui.pop_id() break # Break from inner loop, clipper will re-step imgui.same_line() preview = entry["content"].replace("\\n", " ")[:60] if len(entry["content"]) > 60: preview += "..." imgui.text_colored(vec4(160, 160, 150), preview) if not collapsed: if read_mode: imgui.begin_child("read_content", imgui.ImVec2(0, 150), True) if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(entry["content"]) if self.ui_word_wrap: imgui.pop_text_wrap_pos() imgui.end_child() else: ch, entry["content"] = imgui.input_text_multiline("##content", entry["content"], imgui.ImVec2(-1, 150)) imgui.separator() imgui.pop_id() if self._scroll_disc_to_bottom: imgui.set_scroll_here_y(1.0) self._scroll_disc_to_bottom = False imgui.end_child() def _render_provider_panel(self) -> None: imgui.text("Provider") if imgui.begin_combo("##prov", self.current_provider): for p in PROVIDERS: if imgui.selectable(p, p == self.current_provider)[0]: self.current_provider = p imgui.end_combo() imgui.separator() imgui.text("Model") imgui.same_line() if imgui.button("Fetch Models"): self._fetch_models(self.current_provider) if imgui.begin_list_box("##models", imgui.ImVec2(-1, 120)): for m in self.available_models: if imgui.selectable(m, m == self.current_model)[0]: self.current_model = m imgui.end_list_box() imgui.separator() imgui.text("Parameters") ch, self.temperature = imgui.slider_float("Temperature", self.temperature, 0.0, 2.0, "%.2f") ch, self.max_tokens = imgui.input_int("Max Tokens (Output)", self.max_tokens, 1024) ch, self.history_trunc_limit = imgui.input_int("History Truncation Limit", self.history_trunc_limit, 1024) if self.current_provider == "gemini_cli": imgui.separator() imgui.text("Gemini CLI") sid = "None" if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter: sid = ai_client._gemini_cli_adapter.session_id or "None" imgui.text(f"Session ID: {sid}") if imgui.button("Reset CLI Session"): ai_client.reset_session() imgui.text("Binary Path") ch, self.ui_gemini_cli_path = imgui.input_text("##gcli_path", self.ui_gemini_cli_path) imgui.same_line() if imgui.button("Browse##gcli"): r = hide_tk_root() p = filedialog.askopenfilename(title="Select gemini CLI binary") r.destroy() if p: self.ui_gemini_cli_path = p if ch: if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path imgui.separator() imgui.text("Telemetry") usage = self.session_usage total = usage["input_tokens"] + usage["output_tokens"] if total == 0 and usage.get("total_tokens", 0) > 0: total = usage["total_tokens"] imgui.text_colored(C_RES, f"Tokens: {total:,} (In: {usage['input_tokens']:,} Out: {usage['output_tokens']:,})") if usage.get("last_latency", 0.0) > 0: imgui.text_colored(C_LBL, f" Last Latency: {usage['last_latency']:.2f}s") if usage["cache_read_input_tokens"]: imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}") imgui.text("Token Budget:") imgui.progress_bar(self._token_budget_pct, imgui.ImVec2(-1, 0), f"{self._token_budget_current:,} / {self._token_budget_limit:,}") if self._gemini_cache_text: imgui.text_colored(C_SUB, self._gemini_cache_text) def _render_message_panel(self) -> None: # LIVE indicator is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."] if is_live: val = math.sin(time.time() * 10 * math.pi) alpha = 1.0 if val > 0 else 0.0 imgui.text_colored(imgui.ImVec4(0.39, 1.0, 0.39, alpha), "LIVE") imgui.separator() ch, self.ui_ai_input = imgui.input_text_multiline("##ai_in", self.ui_ai_input, imgui.ImVec2(-1, -40)) # Keyboard shortcuts io = imgui.get_io() ctrl_enter = io.key_ctrl and imgui.is_key_pressed(imgui.Key.enter) ctrl_l = io.key_ctrl and imgui.is_key_pressed(imgui.Key.l) if ctrl_l: self.ui_ai_input = "" imgui.separator() send_busy = False with self._send_thread_lock: if self.send_thread and self.send_thread.is_alive(): send_busy = True if (imgui.button("Gen + Send") or ctrl_enter) and not send_busy: self._handle_generate_send() imgui.same_line() if imgui.button("MD Only"): self._handle_md_only() imgui.same_line() if imgui.button("Reset"): self._handle_reset_session() imgui.same_line() if imgui.button("-> History"): if self.ui_ai_input: self.disc_entries.append({"role": "User", "content": self.ui_ai_input, "collapsed": False, "ts": project_manager.now_ts()}) def _render_response_panel(self) -> None: if self._trigger_blink: self._trigger_blink = False self._is_blinking = True self._blink_start_time = time.time() try: imgui.set_window_focus("Response") except: pass is_blinking = False if self._is_blinking: elapsed = time.time() - self._blink_start_time if elapsed > 1.5: self._is_blinking = False else: is_blinking = True val = math.sin(elapsed * 8 * math.pi) alpha = 50/255 if val > 0 else 0 imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 255, 0, alpha)) imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, alpha)) # --- Always Render Content --- if self.ui_word_wrap: imgui.begin_child("resp_wrap", imgui.ImVec2(-1, -40), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.ai_response) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##ai_out", self.ai_response, imgui.ImVec2(-1, -40), imgui.InputTextFlags_.read_only) imgui.separator() if imgui.button("-> History"): if self.ai_response: self.disc_entries.append({"role": "AI", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()}) if is_blinking: imgui.pop_style_color(2) def _cb_ticket_retry(self, ticket_id: str) -> None: for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'todo' break asyncio.run_coroutine_threadsafe( self.event_queue.put("mma_retry", {"ticket_id": ticket_id}), self._loop ) def _cb_ticket_skip(self, ticket_id: str) -> None: for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'skipped' break asyncio.run_coroutine_threadsafe( self.event_queue.put("mma_skip", {"ticket_id": ticket_id}), self._loop ) def _render_mma_dashboard(self) -> None: # 1. Track Browser imgui.text("Track Browser") if imgui.begin_table("mma_tracks_table", 4, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable): imgui.table_setup_column("Title") imgui.table_setup_column("Status") imgui.table_setup_column("Progress") imgui.table_setup_column("Actions") imgui.table_headers_row() for track in self.tracks: imgui.table_next_row() imgui.table_next_column() imgui.text(track.get("title", "Untitled")) imgui.table_next_column() imgui.text(track.get("status", "unknown")) imgui.table_next_column() progress = track.get("progress", 0.0) imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{int(progress*100)}%") imgui.table_next_column() if imgui.button(f"Load##{track.get('id')}"): self._cb_load_track(track.get("id")) imgui.end_table() imgui.separator() # 2. Global Controls changed, self.mma_step_mode = imgui.checkbox("Step Mode (HITL)", self.mma_step_mode) if changed: # We could push an event here if the engine needs to know immediately pass imgui.same_line() imgui.text(f"Status: {self.mma_status.upper()}") if self.active_tier: imgui.same_line() imgui.text_colored(C_VAL, f"| Active: {self.active_tier}") imgui.separator() # 2. Active Track Info if self.active_track: imgui.text(f"Track: {self.active_track.description}") # Progress bar tickets = self.active_tickets total = len(tickets) if total > 0: complete = sum(1 for t in tickets if t.get('status') == 'complete') progress = complete / total imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{complete}/{total} Tickets") else: imgui.text_disabled("No active MMA track.") # 3. Token Usage Table imgui.separator() imgui.text("Tier Usage (Tokens)") if imgui.begin_table("mma_usage", 3, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg): imgui.table_setup_column("Tier") imgui.table_setup_column("Input") imgui.table_setup_column("Output") imgui.table_headers_row() usage = self.mma_tier_usage for tier, stats in usage.items(): imgui.table_next_row() imgui.table_next_column() imgui.text(tier) imgui.table_next_column() imgui.text(f"{stats.get('input', 0):,}") imgui.table_next_column() imgui.text(f"{stats.get('output', 0):,}") imgui.end_table() imgui.separator() imgui.separator() imgui.text("Strategy (Tier 1)") strategy_text = self.mma_streams.get("Tier 1", "") imgui.input_text_multiline("##mma_strategy", strategy_text, imgui.ImVec2(-1, 150), imgui.InputTextFlags_.read_only) # 4. Task DAG Visualizer imgui.text("Task DAG") if self.active_track: tickets_by_id = {t.get('id'): t for t in self.active_tickets} all_ids = set(tickets_by_id.keys()) # Build children map children_map = {} for t in self.active_tickets: for dep in t.get('depends_on', []): if dep not in children_map: children_map[dep] = [] children_map[dep].append(t.get('id')) # Roots are those whose depends_on elements are NOT in all_ids roots = [] for t in self.active_tickets: deps = t.get('depends_on', []) has_local_dep = any(d in all_ids for d in deps) if not has_local_dep: roots.append(t) rendered = set() for root in roots: self._render_ticket_dag_node(root, tickets_by_id, children_map, rendered) else: imgui.text_disabled("No active MMA track.") def _render_ticket_dag_node(self, ticket: Ticket, tickets_by_id: dict[str, Ticket], children_map: dict[str, list[str]], rendered: set[str]) -> None: tid = ticket.get('id', '??') target = ticket.get('target_file', 'general') status = ticket.get('status', 'pending').upper() # Determine color status_color = vec4(200, 200, 200) # Gray (TODO) if status == 'RUNNING': status_color = vec4(255, 255, 0) # Yellow elif status == 'COMPLETE': status_color = vec4(0, 255, 0) # Green elif status in ['BLOCKED', 'ERROR']: status_color = vec4(255, 0, 0) # Red elif status == 'PAUSED': status_color = vec4(255, 165, 0) # Orange flags = imgui.TreeNodeFlags_.open_on_arrow | imgui.TreeNodeFlags_.open_on_double_click | imgui.TreeNodeFlags_.default_open children = children_map.get(tid, []) if not children: flags |= imgui.TreeNodeFlags_.leaf # Check if already rendered elsewhere to avoid infinite recursion or duplicate subtrees is_duplicate = tid in rendered node_open = imgui.tree_node_ex(f"##{tid}", flags) # Detail View / Tooltip if imgui.is_item_hovered(): imgui.begin_tooltip() imgui.text_colored(C_KEY, f"ID: {tid}") imgui.text_colored(C_LBL, f"Target: {target}") imgui.text_colored(C_LBL, f"Description:") imgui.same_line() imgui.text_wrapped(ticket.get('description', 'N/A')) deps = ticket.get('depends_on', []) if deps: imgui.text_colored(C_LBL, f"Depends on: {', '.join(deps)}") stream_key = f"Tier 3: {tid}" if stream_key in self.mma_streams: imgui.separator() imgui.text_colored(C_KEY, "Worker Stream:") imgui.text_wrapped(self.mma_streams[stream_key]) imgui.end_tooltip() imgui.same_line() imgui.text_colored(C_KEY, tid) imgui.same_line(150) imgui.text_disabled(str(target)) imgui.same_line(400) imgui.text_colored(status_color, status) imgui.same_line(500) if imgui.button(f"Retry##{tid}"): self._cb_ticket_retry(tid) imgui.same_line() if imgui.button(f"Skip##{tid}"): self._cb_ticket_skip(tid) if node_open: if not is_duplicate: rendered.add(tid) for child_id in children: child = tickets_by_id.get(child_id) if child: self._render_ticket_dag_node(child, tickets_by_id, children_map, rendered) else: imgui.text_disabled(" (shown above)") imgui.tree_pop() def _render_tool_calls_panel(self) -> None: imgui.text("Tool call history") imgui.same_line() if imgui.button("Clear##tc"): self._tool_log.clear() imgui.separator() imgui.begin_child("tc_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar) log_copy = list(self._tool_log) for idx_minus_one, entry in enumerate(log_copy): idx = idx_minus_one + 1 # Handle both old (tuple) and new (tuple with ts) entries if len(entry) == 3: script, result, local_ts = entry else: script, result = entry local_ts = 0 # Blink effect blink_alpha = 0.0 if local_ts > 0: elapsed = time.time() - local_ts if elapsed < 3.0: blink_alpha = (1.0 - (elapsed / 3.0)) * 0.3 * (math.sin(elapsed * 10) * 0.5 + 0.5) imgui.push_id(f"tc_entry_{idx}") if blink_alpha > 0: imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, blink_alpha)) imgui.begin_group() first_line = script.strip().splitlines()[0][:80] if script.strip() else "(empty)" imgui.text_colored(C_KEY, f"Call #{idx}: {first_line}") # Script Display imgui.text_colored(C_LBL, "Script:") imgui.same_line() if imgui.button(f"[+]##script_{idx}"): self.show_text_viewer = True self.text_viewer_title = f"Call Script #{idx}" self.text_viewer_content = script if self.ui_word_wrap: imgui.begin_child(f"tc_script_wrap_{idx}", imgui.ImVec2(-1, 72), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(script) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.begin_child(f"tc_script_fixed_width_{idx}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar) imgui.input_text_multiline(f"##tc_script_res_{idx}", script, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end_child() # Result Display imgui.text_colored(C_LBL, "Output:") imgui.same_line() if imgui.button(f"[+]##output_{idx}"): self.show_text_viewer = True self.text_viewer_title = f"Call Output #{idx}" self.text_viewer_content = result if self.ui_word_wrap: imgui.begin_child(f"tc_res_wrap_{idx}", imgui.ImVec2(-1, 72), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(result) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.begin_child(f"tc_res_fixed_width_{idx}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar) imgui.input_text_multiline(f"##tc_res_val_{idx}", result, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end_child() imgui.separator() if blink_alpha > 0: imgui.end_group() imgui.pop_style_color() imgui.pop_id() if self._scroll_tool_calls_to_bottom: imgui.set_scroll_here_y(1.0) self._scroll_tool_calls_to_bottom = False imgui.end_child() def _render_comms_history_panel(self) -> None: imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}") imgui.same_line() if imgui.button("Clear##comms"): ai_client.clear_comms_log() self._comms_log.clear() imgui.same_line() if imgui.button("Load Log"): self.cb_load_prior_log() if self.is_viewing_prior_session: imgui.same_line() if imgui.button("Exit Prior Session"): self.is_viewing_prior_session = False self.prior_session_entries.clear() self.ai_status = "idle" imgui.separator() imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION") imgui.separator() imgui.text_colored(C_OUT, "OUT") imgui.same_line() imgui.text_colored(C_REQ, "request") imgui.same_line() imgui.text_colored(C_TC, "tool_call") imgui.same_line() imgui.text(" ") imgui.same_line() imgui.text_colored(C_IN, "IN") imgui.same_line() imgui.text_colored(C_RES, "response") imgui.same_line() imgui.text_colored(C_TR, "tool_result") imgui.separator() # Use tinted background for prior session if self.is_viewing_prior_session: imgui.push_style_color(imgui.Col_.child_bg, vec4(40, 30, 20)) imgui.begin_child("comms_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar) log_to_render = self.prior_session_entries if self.is_viewing_prior_session else list(self._comms_log) for idx_minus_one, entry in enumerate(log_to_render): idx = idx_minus_one + 1 local_ts = entry.get("local_ts", 0) # Blink effect blink_alpha = 0.0 if local_ts > 0 and not self.is_viewing_prior_session: elapsed = time.time() - local_ts if elapsed < 3.0: blink_alpha = (1.0 - (elapsed / 3.0)) * 0.3 * (math.sin(elapsed * 10) * 0.5 + 0.5) imgui.push_id(f"comms_{idx}") if blink_alpha > 0: # Draw a background highlight for the entry draw_list = imgui.get_window_draw_list() p_min = imgui.get_cursor_screen_pos() # Estimate height or just use a fixed height for the background # It's better to wrap the entry in a group or just use separators # For now, let's just use the style color push if we are sure we pop it imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, blink_alpha)) # We still need a child or a group to apply the background to imgui.begin_group() d = entry.get("direction", "IN") k = entry.get("kind", "response") imgui.text_colored(vec4(160, 160, 160), f"#{idx}") imgui.same_line() imgui.text_colored(vec4(160, 160, 160), entry.get("ts", "00:00:00")) imgui.same_line() imgui.text_colored(DIR_COLORS.get(d, C_VAL), d) imgui.same_line() imgui.text_colored(KIND_COLORS.get(k, C_VAL), k) imgui.same_line() imgui.text_colored(C_LBL, f"{entry.get('provider', '?')}/{entry.get('model', '?')}") payload = entry.get("payload", {}) if k == "request": self._render_heavy_text("message", payload.get("message", "")) elif k == "response": imgui.text_colored(C_LBL, "round:") imgui.same_line() imgui.text_colored(C_VAL, str(payload.get("round", ""))) imgui.text_colored(C_LBL, "stop_reason:") imgui.same_line() imgui.text_colored(vec4(255, 200, 120), str(payload.get("stop_reason", ""))) text = payload.get("text", "") if text: self._render_heavy_text("text", text) imgui.text_colored(C_LBL, "tool_calls:") tcs = payload.get("tool_calls", []) if not tcs: imgui.text_colored(C_VAL, " (none)") for tc_i, tc in enumerate(tcs): imgui.text_colored(C_KEY, f" call[{tc_i}] {tc.get('name', '?')}") if tc.get("id"): imgui.text_colored(C_LBL, " id:") imgui.same_line() imgui.text_colored(C_VAL, str(tc["id"])) if "args" in tc or "input" in tc: self._render_heavy_text(f"call_{tc_i}_args", str(tc.get("args") or tc.get("input"))) elif k == "tool_call": imgui.text_colored(C_KEY, payload.get("name", "?")) if payload.get("id"): imgui.text_colored(C_LBL, " id:") imgui.same_line() imgui.text_colored(C_VAL, str(payload["id"])) if "script" in payload: self._render_heavy_text("script", payload["script"]) if "args" in payload: self._render_heavy_text("args", str(payload["args"])) elif k == "tool_result": imgui.text_colored(C_KEY, payload.get("name", "?")) if payload.get("id"): imgui.text_colored(C_LBL, " id:") imgui.same_line() imgui.text_colored(C_VAL, str(payload["id"])) if "output" in payload: self._render_heavy_text("output", payload["output"]) if "results" in payload: for r_i, r in enumerate(payload["results"]): imgui.text_colored(C_LBL, f" Result[{r_i}]:") self._render_heavy_text(f"res_{r_i}", str(r)) if "usage" in payload: u = payload["usage"] u_str = f"In: {u.get('input_tokens', 0)} Out: {u.get('output_tokens', 0)}" if u.get("cache_read_input_tokens"): u_str += f" (Cache: {u['cache_read_input_tokens']})" imgui.text_colored(C_SUB, f" Usage: {u_str}") imgui.separator() if blink_alpha > 0: imgui.end_group() imgui.pop_style_color() imgui.pop_id() if self._scroll_comms_to_bottom: imgui.set_scroll_here_y(1.0) self._scroll_comms_to_bottom = False imgui.end_child() if self.is_viewing_prior_session: imgui.pop_style_color() def _render_system_prompts_panel(self) -> None: imgui.text("Global System Prompt (all projects)") ch, self.ui_global_system_prompt = imgui.input_text_multiline("##gsp", self.ui_global_system_prompt, imgui.ImVec2(-1, 100)) imgui.separator() imgui.text("Project System Prompt") ch, self.ui_project_system_prompt = imgui.input_text_multiline("##psp", self.ui_project_system_prompt, imgui.ImVec2(-1, 100)) def _render_theme_panel(self) -> None: exp, self.show_windows["Theme"] = imgui.begin("Theme", self.show_windows["Theme"]) if exp: imgui.text("Palette") cp = theme.get_current_palette() if imgui.begin_combo("##pal", cp): for p in theme.get_palette_names(): if imgui.selectable(p, p == cp)[0]: theme.apply(p) imgui.end_combo() imgui.separator() imgui.text("Font") imgui.push_item_width(-150) ch, path = imgui.input_text("##fontp", theme.get_current_font_path()) imgui.pop_item_width() if ch: theme._current_font_path = path imgui.same_line() if imgui.button("Browse##font"): r = hide_tk_root() p = filedialog.askopenfilename(filetypes=[("Fonts", "*.ttf *.otf"), ("All", "*.*")]) r.destroy() if p: theme._current_font_path = p imgui.text("Size (px)") imgui.same_line() imgui.push_item_width(100) ch, size = imgui.input_float("##fonts", theme.get_current_font_size(), 1.0, 1.0, "%.0f") if ch: theme._current_font_size = size imgui.pop_item_width() imgui.same_line() if imgui.button("Apply Font (Requires Restart)"): self._flush_to_config() save_config(self.config) self.ai_status = "Font settings saved. Restart required." imgui.separator() imgui.text("UI Scale (DPI)") ch, scale = imgui.slider_float("##scale", theme.get_current_scale(), 0.5, 3.0, "%.2f") if ch: theme.set_scale(scale) imgui.end() def _load_fonts(self) -> None: font_path, font_size = theme.get_font_loading_params() if font_path and Path(font_path).exists(): hello_imgui.load_font(font_path, font_size) def _post_init(self) -> None: theme.apply_current() def run(self) -> None: """Initializes the ImGui runner and starts the main application loop.""" if "--headless" in sys.argv: print("Headless mode active") self._fetch_models(self.current_provider) import uvicorn headless_cfg = self.config.get("headless", {}) port = headless_cfg.get("port", 8000) api = self.create_api() uvicorn.run(api, host="0.0.0.0", port=port) else: theme.load_from_config(self.config) self.runner_params = hello_imgui.RunnerParams() self.runner_params.app_window_params.window_title = "manual slop" self.runner_params.app_window_params.window_geometry.size = (1680, 1200) self.runner_params.imgui_window_params.enable_viewports = False self.runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space self.runner_params.fps_idling.enable_idling = False self.runner_params.imgui_window_params.show_menu_bar = True self.runner_params.ini_folder_type = hello_imgui.IniFolderType.current_folder self.runner_params.ini_filename = "manualslop_layout.ini" self.runner_params.callbacks.show_gui = self._gui_func self.runner_params.callbacks.show_menus = self._show_menus self.runner_params.callbacks.load_additional_fonts = self._load_fonts self.runner_params.callbacks.post_init = self._post_init self._fetch_models(self.current_provider) # Start API hooks server (if enabled) self.hook_server = api_hooks.HookServer(self) self.hook_server.start() immapp.run(self.runner_params) # On exit self.hook_server.stop() self.perf_monitor.stop() ai_client.cleanup() # Destroy active API caches to stop billing self._flush_to_project() self._save_active_project() self._flush_to_config() save_config(self.config) session_logger.close_session() def main() -> None: app = App() app.run() if __name__ == "__main__": main()