Files
manual_slop/gui_2.py

3568 lines
139 KiB
Python

# gui_2.py
from __future__ import annotations
import tomli_w
import threading
import asyncio
import time
import math
import json
import sys
import os
import uuid
import requests
from pathlib import Path
from tkinter import filedialog, Tk
from typing import Optional, Callable, Any, Dict, List, Tuple, Union
import aggregate
import ai_client
import cost_tracker
from ai_client import ProviderError
import shell_runner
import session_logger
import project_manager
import theme_2 as theme
import tomllib
import events
import numpy as np
import api_hooks
import mcp_client
import orchestrator_pm
from performance_monitor import PerformanceMonitor
from log_registry import LogRegistry
from log_pruner import LogPruner
import conductor_tech_lead
import multi_agent_conductor
from models import Track, Ticket
from file_cache import ASTParser
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel
from imgui_bundle import imgui, hello_imgui, immapp
CONFIG_PATH: Path = Path("config.toml")
PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek"]
COMMS_CLAMP_CHARS: int = 300
def load_config() -> dict[str, Any]:
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
def save_config(config: dict[str, Any]) -> None:
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
def hide_tk_root() -> Tk:
root = Tk()
root.withdraw()
root.wm_attributes("-topmost", True)
return root
# Color Helpers
def vec4(r: float, g: float, b: float, a: float = 1.0) -> imgui.ImVec4: return imgui.ImVec4(r/255, g/255, b/255, a)
C_OUT: tuple[float, ...] = vec4(100, 200, 255)
C_IN: tuple[float, ...] = vec4(140, 255, 160)
C_REQ: tuple[float, ...] = vec4(255, 220, 100)
C_RES: tuple[float, ...] = vec4(180, 255, 180)
C_TC: tuple[float, ...] = vec4(255, 180, 80)
C_TR: tuple[float, ...] = vec4(180, 220, 255)
C_TRS: tuple[float, ...] = vec4(200, 180, 255)
C_LBL: tuple[float, ...] = vec4(180, 180, 180)
C_VAL: tuple[float, ...] = vec4(220, 220, 220)
C_KEY: tuple[float, ...] = vec4(140, 200, 255)
C_NUM: tuple[float, ...] = vec4(180, 255, 180)
C_SUB: tuple[float, ...] = vec4(220, 200, 120)
DIR_COLORS: dict[str, tuple[float, ...]] = {"OUT": C_OUT, "IN": C_IN}
KIND_COLORS: dict[str, tuple[float, ...]] = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS}
HEAVY_KEYS: set[str] = {"message", "text", "script", "output", "content"}
DISC_ROLES: list[str] = ["User", "AI", "Vendor API", "System"]
AGENT_TOOL_NAMES: list[str] = ["run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"]
def truncate_entries(entries: list[dict[str, Any]], max_pairs: int) -> list[dict[str, Any]]:
if max_pairs <= 0:
return []
target_count = max_pairs * 2
if len(entries) <= target_count:
return entries
return entries[-target_count:]
def _parse_history_entries(history: list[str], roles: list[str] | None = None) -> list[dict[str, Any]]:
known = roles if roles is not None else DISC_ROLES
entries = []
for raw in history:
entry = project_manager.str_to_entry(raw, known)
entries.append(entry)
return entries
class ConfirmDialog:
def __init__(self, script: str, base_dir: str) -> None:
self._uid = str(uuid.uuid4())
self._script = str(script) if script is not None else ""
self._base_dir = str(base_dir) if base_dir is not None else ""
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return self._approved, self._script
class MMAApprovalDialog:
def __init__(self, ticket_id: str, payload: str) -> None:
self._ticket_id = ticket_id
self._payload = payload
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return self._approved, self._payload
class MMASpawnApprovalDialog:
def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None:
self._ticket_id = ticket_id
self._role = role
self._prompt = prompt
self._context_md = context_md
self._condition = threading.Condition()
self._done = False
self._approved = False
self._abort = False
def wait(self) -> dict[str, Any]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return {
'approved': self._approved,
'abort': self._abort,
'prompt': self._prompt,
'context_md': self._context_md
}
class GenerateRequest(BaseModel):
prompt: str
auto_add_history: bool = True
temperature: float | None = None
max_tokens: int | None = None
class ConfirmRequest(BaseModel):
approved: bool
class App:
"""The main ImGui interface orchestrator for Manual Slop."""
def __init__(self) -> None:
# Initialize locks first to avoid initialization order issues
self._send_thread_lock = threading.Lock()
self._disc_entries_lock = threading.Lock()
self._pending_comms_lock = threading.Lock()
self._pending_tool_calls_lock = threading.Lock()
self._pending_history_adds_lock = threading.Lock()
self._pending_gui_tasks_lock = threading.Lock()
self._pending_dialog_lock = threading.Lock()
self._api_event_queue_lock = threading.Lock()
self.config = load_config()
self.event_queue = events.AsyncEventQueue()
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
ai_cfg = self.config.get("ai", {})
self._current_provider: str = ai_cfg.get("provider", "gemini")
self._current_model: str = ai_cfg.get("model", "gemini-2.5-flash-lite")
self.available_models: list[str] = []
self.temperature: float = ai_cfg.get("temperature", 0.0)
self.max_tokens: int = ai_cfg.get("max_tokens", 8192)
self.history_trunc_limit: int = ai_cfg.get("history_trunc_limit", 8000)
projects_cfg = self.config.get("projects", {})
self.project_paths: list[str] = list(projects_cfg.get("paths", []))
self.active_project_path: str = projects_cfg.get("active", "")
self.project: dict[str, Any] = {}
self.active_discussion: str = "main"
self._load_active_project()
self.files: list[str] = list(self.project.get("files", {}).get("paths", []))
self.screenshots: list[str] = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles: list[str] = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
with self._disc_entries_lock:
self.disc_entries: list[dict[str, Any]] = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".")
proj_meta = self.project.get("project", {})
self.ui_project_git_dir = proj_meta.get("git_dir", "")
self.ui_project_main_context = proj_meta.get("main_context", "")
self.ui_project_system_prompt = proj_meta.get("system_prompt", "")
self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini")
self.ui_word_wrap = proj_meta.get("word_wrap", True)
self.ui_summary_only = proj_meta.get("summary_only", False)
self.ui_auto_add_history = disc_sec.get("auto_add", False)
self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "")
self.ui_ai_input = ""
self.ui_disc_new_name_input = ""
self.ui_disc_new_role_input = ""
self.ui_epic_input = ""
self.proposed_tracks: list[dict[str, Any]] = []
self._show_track_proposal_modal = False
self.ui_new_track_name = ""
self.ui_new_track_desc = ""
self.ui_new_track_type = "feature"
self.ui_conductor_setup_summary = ""
self.ui_last_script_text = ""
self.ui_last_script_output = ""
self.ai_status = "idle"
self.ai_response = ""
self.last_md = ""
self.last_md_path: Path | None = None
self.last_file_items: list[Any] = []
self.send_thread: threading.Thread | None = None
self.models_thread: threading.Thread | None = None
_default_windows = {
"Context Hub": True,
"Files & Media": True,
"AI Settings": True,
"MMA Dashboard": True,
"Tier 1: Strategy": True,
"Tier 2: Tech Lead": True,
"Tier 3: Workers": True,
"Tier 4: QA": True,
"Discussion Hub": True,
"Operations Hub": True,
"Theme": True,
"Log Management": False,
"Diagnostics": False,
}
saved = self.config.get("gui", {}).get("show_windows", {})
self.show_windows = {k: saved.get(k, v) for k, v in _default_windows.items()}
self.show_script_output = False
self.show_text_viewer = False
self.text_viewer_title = ""
self.text_viewer_content = ""
self._pending_dialog: ConfirmDialog | None = None
self._pending_dialog_open = False
self._pending_actions: dict[str, ConfirmDialog] = {}
self._pending_ask_dialog = False
self._ask_dialog_open = False
self._ask_request_id = None
self._ask_tool_data = None
self.mma_step_mode = False
self.active_track: Track | None = None
self.active_tickets: list[dict[str, Any]] = []
self.active_tier: str | None = None
self.mma_status = "idle"
self._pending_mma_approval: dict[str, Any] | None = None
self._mma_approval_open = False
self._mma_approval_edit_mode = False
self._mma_approval_payload = ""
self._pending_mma_spawn: dict[str, Any] | None = None
self._mma_spawn_open = False
self._mma_spawn_edit_mode = False
self._mma_spawn_prompt = ''
self._mma_spawn_context = ''
self.mma_tier_usage = {
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"},
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview"},
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
}
self._tool_log: list[tuple[str, str, float]] = []
self._comms_log: list[dict[str, Any]] = []
self._pending_comms: list[dict[str, Any]] = []
self._pending_tool_calls: list[tuple[str, str, float]] = []
self._pending_history_adds: list[dict[str, Any]] = []
self._trigger_blink = False
self._is_blinking = False
self._blink_start_time = 0.0
self._trigger_script_blink = False
self._is_script_blinking = False
self._script_blink_start_time = 0.0
self._scroll_disc_to_bottom = False
self._scroll_comms_to_bottom = False
self._scroll_tool_calls_to_bottom = False
self._pending_gui_tasks: list[dict[str, Any]] = []
self.session_usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0}
self._token_budget_pct = 0.0
self._token_budget_current = 0
self._token_budget_limit = 0
self._gemini_cache_text = ""
self.ui_disc_truncate_pairs: int = 2
self.ui_auto_scroll_comms = True
self.ui_auto_scroll_tool_calls = True
agent_tools_cfg = self.project.get("agent", {}).get("tools", {})
self.ui_agent_tools: dict[str, bool] = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
self.tracks: list[dict[str, Any]] = []
self.ui_conductor_setup_summary = ""
self.ui_new_track_name = ""
self.ui_new_track_desc = ""
self.ui_new_track_type = "feature"
self._show_add_ticket_form = False
self.ui_new_ticket_id = ""
self.ui_new_ticket_desc = ""
self.ui_new_ticket_target = ""
self.ui_new_ticket_deps = ""
self._track_discussion_active = False
self.mma_streams: dict[str, str] = {}
self._tier_stream_last_len: dict[str, int] = {}
self.is_viewing_prior_session = False
self.prior_session_entries: list[dict[str, Any]] = []
self.test_hooks_enabled = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1")
self.perf_monitor = PerformanceMonitor()
self.perf_history = {"frame_time": [0.0]*100, "fps": [0.0]*100, "cpu": [0.0]*100, "input_lag": [0.0]*100}
self._perf_last_update = 0.0
self._autosave_interval = 60.0
self._last_autosave = time.time()
label = self.project.get("project", {}).get("name", "")
session_logger.open_session(label=label)
self._prune_old_logs()
self._init_ai_and_hooks()
def _prune_old_logs(self) -> None:
"""Asynchronously prunes old insignificant logs on startup."""
def run_prune() -> None:
try:
registry = LogRegistry("logs/log_registry.toml")
pruner = LogPruner(registry, "logs")
pruner.prune()
except Exception as e:
print(f"Error during log pruning: {e}")
thread = threading.Thread(target=run_prune, daemon=True)
thread.start()
@property
def current_provider(self) -> str:
return self._current_provider
@current_provider.setter
def current_provider(self, value: str) -> None:
if value != self._current_provider:
self._current_provider = value
ai_client.reset_session()
ai_client.set_provider(value, self.current_model)
if value == "gemini_cli":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
if hasattr(self, 'hook_server'):
self.hook_server.start()
self.available_models = []
self._fetch_models(value)
@property
def current_model(self) -> str:
return self._current_model
@current_model.setter
def current_model(self, value: str) -> None:
if value != self._current_model:
self._current_model = value
ai_client.reset_session()
ai_client.set_provider(self.current_provider, value)
def _init_ai_and_hooks(self) -> None:
ai_client.set_provider(self.current_provider, self.current_model)
if self.current_provider == "gemini_cli":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
ai_client.confirm_and_run_callback = self._confirm_and_run
ai_client.comms_log_callback = self._on_comms_entry
ai_client.tool_log_callback = self._on_tool_log
mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics
self.perf_monitor.alert_callback = self._on_performance_alert
ai_client.events.on("request_start", self._on_api_event)
ai_client.events.on("response_received", self._on_api_event)
ai_client.events.on("tool_execution", self._on_api_event)
self._settable_fields: dict[str, str] = {
'ai_input': 'ui_ai_input',
'project_git_dir': 'ui_project_git_dir',
'auto_add_history': 'ui_auto_add_history',
'disc_new_name_input': 'ui_disc_new_name_input',
'project_main_context': 'ui_project_main_context',
'gcli_path': 'ui_gemini_cli_path',
'output_dir': 'ui_output_dir',
'files_base_dir': 'ui_files_base_dir',
'ai_status': 'ai_status',
'ai_response': 'ai_response',
'active_discussion': 'active_discussion',
'current_provider': 'current_provider',
'current_model': 'current_model',
'token_budget_pct': '_token_budget_pct',
'token_budget_current': '_token_budget_current',
'token_budget_label': '_token_budget_label',
'show_confirm_modal': 'show_confirm_modal',
'mma_epic_input': 'ui_epic_input',
'mma_status': 'mma_status',
'mma_active_tier': 'active_tier',
'ui_new_track_name': 'ui_new_track_name',
'ui_new_track_desc': 'ui_new_track_desc'
}
self._clickable_actions: dict[str, Callable[..., Any]] = {
'btn_reset': self._handle_reset_session,
'btn_gen_send': self._handle_generate_send,
'btn_md_only': self._handle_md_only,
'btn_approve_script': self._handle_approve_script,
'btn_reject_script': self._handle_reject_script,
'btn_project_save': self._cb_project_save,
'btn_disc_create': self._cb_disc_create,
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type),
'btn_approve_tool': self._handle_approve_tool,
'btn_approve_script': self._handle_approve_script,
'btn_approve_mma_step': self._handle_approve_mma_step,
'btn_approve_spawn': self._handle_approve_spawn,
}
self._predefined_callbacks: dict[str, Callable[..., Any]] = {
'_test_callback_func_write_to_file': self._test_callback_func_write_to_file
}
self._discussion_names_cache: list[str] = []
self._discussion_names_dirty: bool = True
self.hook_server = api_hooks.HookServer(self)
self.hook_server.start()
def create_api(self) -> FastAPI:
"""Creates and configures the FastAPI application for headless mode."""
api = FastAPI(title="Manual Slop Headless API")
API_KEY_NAME = "X-API-KEY"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def get_api_key(header_key: str = Depends(api_key_header)) -> str:
"""Validates the API key from the request header against configuration."""
headless_cfg = self.config.get("headless", {})
config_key = headless_cfg.get("api_key", "").strip()
env_key = os.environ.get("SLOP_API_KEY", "").strip()
target_key = env_key or config_key
if not target_key:
raise HTTPException(status_code=403, detail="API Key not configured on server")
if header_key == target_key:
return header_key
raise HTTPException(status_code=403, detail="Could not validate API Key")
@api.get("/health")
def health() -> dict[str, str]:
"""Basic health check endpoint."""
return {"status": "ok"}
@api.get("/status", dependencies=[Depends(get_api_key)])
def status() -> dict[str, Any]:
"""Returns the current status of the AI provider and active project."""
return {
"provider": self.current_provider,
"model": self.current_model,
"active_project": self.active_project_path,
"ai_status": self.ai_status,
"session_usage": self.session_usage
}
@api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)])
def pending_actions() -> list[dict[str, Any]]:
"""Lists all PowerShell scripts awaiting manual confirmation."""
actions = []
with self._pending_dialog_lock:
for uid, dialog in self._pending_actions.items():
actions.append({
"action_id": uid,
"script": dialog._script,
"base_dir": dialog._base_dir
})
if self._pending_dialog:
actions.append({
"action_id": self._pending_dialog._uid,
"script": self._pending_dialog._script,
"base_dir": self._pending_dialog._base_dir
})
return actions
@api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)])
def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, Any]:
"""Approves or denies a pending PowerShell script execution."""
success = self.resolve_pending_action(action_id, req.approved)
if not success:
raise HTTPException(status_code=404, detail=f"Action ID {action_id} not found")
return {"status": "success", "action_id": action_id, "approved": req.approved}
@api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)])
def list_sessions() -> list[str]:
"""Lists all available session log files."""
log_dir = Path("logs")
if not log_dir.exists():
return []
return sorted([f.name for f in log_dir.glob("*.log")], reverse=True)
@api.get("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)])
def get_session(filename: str) -> dict[str, str]:
"""Retrieves the content of a specific session log file."""
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
log_path = Path("logs") / filename
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Session log not found")
try:
content = log_path.read_text(encoding="utf-8")
return {"filename": filename, "content": content}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@api.delete("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)])
def delete_session(filename: str) -> dict[str, str]:
"""Deletes a specific session log file."""
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
log_path = Path("logs") / filename
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Session log not found")
try:
log_path.unlink()
return {"status": "success", "message": f"Deleted {filename}"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@api.get("/api/v1/context", dependencies=[Depends(get_api_key)])
def get_context() -> dict[str, Any]:
"""Returns the current file and screenshot context configuration."""
return {
"files": self.files,
"screenshots": self.screenshots,
"files_base_dir": self.ui_files_base_dir,
"screenshots_base_dir": self.ui_shots_base_dir
}
@api.post("/api/v1/generate", dependencies=[Depends(get_api_key)])
def generate(req: GenerateRequest) -> dict[str, Any]:
"""Triggers an AI generation request using the current project context."""
if not req.prompt.strip():
raise HTTPException(status_code=400, detail="Prompt cannot be empty")
with self._send_thread_lock:
start_time = time.time()
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}")
user_msg = req.prompt
base_dir = self.ui_files_base_dir
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
temp = req.temperature if req.temperature is not None else self.temperature
tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens
ai_client.set_model_params(temp, tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools)
if req.auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": user_msg,
"collapsed": False,
"ts": project_manager.now_ts()
})
try:
resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text)
if req.auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "AI",
"content": resp,
"collapsed": False,
"ts": project_manager.now_ts()
})
self._recalculate_session_usage()
duration = time.time() - start_time
return {
"text": resp,
"metadata": {
"provider": self.current_provider,
"model": self.current_model,
"duration_sec": round(duration, 3),
"timestamp": project_manager.now_ts()
},
"usage": self.session_usage
}
except ProviderError as e:
raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}")
@api.post("/api/v1/stream", dependencies=[Depends(get_api_key)])
async def stream(req: GenerateRequest) -> Any:
"""Placeholder for streaming AI generation responses (Not yet implemented)."""
raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.")
return api
# ---------------------------------------------------------------- project loading
def _cb_new_project_automated(self, user_data: Any) -> None:
if user_data:
name = Path(user_data).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, user_data)
if user_data not in self.project_paths:
self.project_paths.append(user_data)
self._switch_project(user_data)
def _cb_project_save(self) -> None:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
def _cb_disc_create(self) -> None:
nm = self.ui_disc_new_name_input.strip()
if nm:
self._create_discussion(nm)
self.ui_disc_new_name_input = ""
def _load_active_project(self) -> None:
if self.active_project_path and Path(self.active_project_path).exists():
try:
self.project = project_manager.load_project(self.active_project_path)
return
except Exception as e:
print(f"Failed to load project {self.active_project_path}: {e}")
for pp in self.project_paths:
if Path(pp).exists():
try:
self.project = project_manager.load_project(pp)
self.active_project_path = pp
return
except Exception:
continue
self.project = project_manager.migrate_from_legacy_config(self.config)
name = self.project.get("project", {}).get("name", "project")
fallback_path = f"{name}.toml"
project_manager.save_project(self.project, fallback_path)
self.active_project_path = fallback_path
if fallback_path not in self.project_paths:
self.project_paths.append(fallback_path)
def _switch_project(self, path: str) -> None:
if not Path(path).exists():
self.ai_status = f"project file not found: {path}"
return
self._flush_to_project()
self._save_active_project()
try:
self.project = project_manager.load_project(path)
self.active_project_path = path
except Exception as e:
self.ai_status = f"failed to load project: {e}"
return
self._refresh_from_project()
self._discussion_names_dirty = True
ai_client.reset_session()
self.ai_status = f"switched to: {Path(path).stem}"
def _refresh_from_project(self) -> None:
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
with self._disc_entries_lock:
self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
proj = self.project
self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".")
self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "")
self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "")
self.ui_project_main_context = proj.get("project", {}).get("main_context", "")
self.ui_gemini_cli_path = proj.get("gemini_cli", {}).get("binary_path", "gemini")
self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False)
self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True)
self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True)
self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True)
self.ui_summary_only = proj.get("project", {}).get("summary_only", False)
agent_tools_cfg = proj.get("agent", {}).get("tools", {})
self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
# MMA Tracks
self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir)
# Restore MMA state
mma_sec = proj.get("mma", {})
self.ui_epic_input = mma_sec.get("epic", "")
at_data = mma_sec.get("active_track")
if at_data:
try:
tickets = []
for t_data in at_data.get("tickets", []):
tickets.append(Ticket(**t_data))
self.active_track = Track(
id=at_data.get("id"),
description=at_data.get("description"),
tickets=tickets
)
self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table
except Exception as e:
print(f"Failed to deserialize active track: {e}")
self.active_track = None
else:
self.active_track = None
self.active_tickets = []
# Load track-scoped history if track is active
if self.active_track:
track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir)
if track_history:
with self._disc_entries_lock:
self.disc_entries = _parse_history_entries(track_history, self.disc_roles)
def _cb_load_track(self, track_id: str) -> None:
state = project_manager.load_track_state(track_id, self.ui_files_base_dir)
if state:
try:
# Convert list[Ticket] or list[dict] to list[Ticket] for Track object
tickets = []
for t in state.tasks:
if isinstance(t, dict):
tickets.append(Ticket(**t))
else:
tickets.append(t)
self.active_track = Track(
id=state.metadata.id,
description=state.metadata.name,
tickets=tickets
)
# Keep dicts for UI table (or convert Ticket objects back to dicts if needed)
from dataclasses import asdict
self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets]
# Load track-scoped history
history = project_manager.load_track_history(track_id, self.ui_files_base_dir)
with self._disc_entries_lock:
if history:
self.disc_entries = _parse_history_entries(history, self.disc_roles)
else:
self.disc_entries = []
self._recalculate_session_usage()
self.ai_status = f"Loaded track: {state.metadata.name}"
except Exception as e:
self.ai_status = f"Load track error: {e}"
print(f"Error loading track {track_id}: {e}")
def _save_active_project(self) -> None:
if self.active_project_path:
try:
project_manager.save_project(self.project, self.active_project_path)
except Exception as e:
self.ai_status = f"save error: {e}"
# ---------------------------------------------------------------- discussion management
def _get_discussion_names(self) -> list[str]:
if self._discussion_names_dirty:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
self._discussion_names_cache = sorted(discussions.keys())
self._discussion_names_dirty = False
return self._discussion_names_cache
def _switch_discussion(self, name: str) -> None:
self._flush_disc_entries_to_project()
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if name not in discussions:
self.ai_status = f"discussion not found: {name}"
return
self.active_discussion = name
self.active_discussion_idx = -1
discussions_root = self.project.get("discussions", [])
for i, d in enumerate(discussions_root):
if isinstance(d, dict) and d.get("title") == name:
self.active_discussion_idx = i
break
self._track_discussion_active = False
disc_sec["active"] = name
self._discussion_names_dirty = True
disc_data = discussions[name]
with self._disc_entries_lock:
self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
self.ai_status = f"discussion: {name}"
def _flush_disc_entries_to_project(self) -> None:
history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries]
if self.active_track and self._track_discussion_active:
project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir)
return
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion())
disc_data["history"] = history_strings
disc_data["last_updated"] = project_manager.now_ts()
def _create_discussion(self, name: str) -> None:
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
if name in discussions:
self.ai_status = f"discussion '{name}' already exists"
return
discussions[name] = project_manager.default_discussion()
self._discussion_names_dirty = True
self._switch_discussion(name)
def _rename_discussion(self, old_name: str, new_name: str) -> None:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if old_name not in discussions:
return
if new_name in discussions:
self.ai_status = f"discussion '{new_name}' already exists"
return
discussions[new_name] = discussions.pop(old_name)
self._discussion_names_dirty = True
if self.active_discussion == old_name:
self.active_discussion = new_name
disc_sec["active"] = new_name
def _delete_discussion(self, name: str) -> None:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if len(discussions) <= 1:
self.ai_status = "cannot delete the last discussion"
return
if name not in discussions:
return
del discussions[name]
self._discussion_names_dirty = True
if self.active_discussion == name:
remaining = sorted(discussions.keys())
self._switch_discussion(remaining[0])
# ---------------------------------------------------------------- logic
def _on_comms_entry(self, entry: dict) -> None:
# sys.stderr.write(f"[DEBUG] _on_comms_entry: {entry.get('kind')} {entry.get('direction')}\n")
session_logger.log_comms(entry)
entry["local_ts"] = time.time()
kind = entry.get("kind")
payload = entry.get("payload", {})
if kind in ("tool_result", "tool_call"):
role = "Tool" if kind == "tool_result" else "Vendor API"
content = ""
if kind == "tool_result":
content = payload.get("output", "")
else:
content = payload.get("script") or payload.get("args") or payload.get("message", "")
if isinstance(content, dict):
content = json.dumps(content, indent=1)
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": f"[{kind.upper().replace('_', ' ')}]\n{content}",
"collapsed": True,
"ts": entry.get("ts", project_manager.now_ts())
})
# If this is a history_add kind, route it to history queue instead
if kind == "history_add":
payload = entry.get("payload", {})
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": payload.get("role", "AI"),
"content": payload.get("content", ""),
"collapsed": payload.get("collapsed", False),
"ts": entry.get("ts", project_manager.now_ts())
})
return
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _on_tool_log(self, script: str, result: str) -> None:
session_logger.log_tool_call(script, result, None)
with self._pending_tool_calls_lock:
self._pending_tool_calls.append((script, result, time.time()))
def _on_api_event(self, *args, **kwargs) -> None:
payload = kwargs.get("payload", {})
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
def _on_performance_alert(self, message: str) -> None:
"""Called by PerformanceMonitor when a threshold is exceeded."""
alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load."
# Inject into history as a 'System' message
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "System",
"content": alert_text,
"ts": project_manager.now_ts()
})
def _process_pending_gui_tasks(self) -> None:
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in tasks:
try:
action = task.get("action")
if action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}))
elif action == "handle_ai_response":
payload = task.get("payload", {})
text = payload.get("text", "")
stream_id = payload.get("stream_id")
is_streaming = payload.get("status") == "streaming..."
if stream_id:
if is_streaming:
if stream_id not in self.mma_streams: self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
else:
self.mma_streams[stream_id] = text
if stream_id == "Tier 1":
if "status" in payload:
self.ai_status = payload["status"]
else:
if is_streaming:
self.ai_response += text
else:
self.ai_response = text
self.ai_status = payload.get("status", "done")
self._trigger_blink = True
if self.ui_auto_add_history and not stream_id:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": self.ai_response,
"collapsed": False,
"ts": project_manager.now_ts()
})
elif action == "mma_stream_append":
payload = task.get("payload", {})
stream_id = payload.get("stream_id")
text = payload.get("text", "")
if stream_id:
if stream_id not in self.mma_streams:
self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
elif action == "show_track_proposal":
self.proposed_tracks = task.get("payload", [])
self._show_track_proposal_modal = True
elif action == "mma_state_update":
payload = task.get("payload", {})
self.mma_status = payload.get("status", "idle")
self.active_tier = payload.get("active_tier")
self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage)
self.active_tickets = payload.get("tickets", [])
track_data = payload.get("track")
if track_data:
tickets = []
for t_data in self.active_tickets:
tickets.append(Ticket(**t_data))
self.active_track = Track(
id=track_data.get("id"),
description=track_data.get("title", ""),
tickets=tickets
)
elif action == "set_value":
item = task.get("item")
value = task.get("value")
if item in self._settable_fields:
attr_name = self._settable_fields[item]
setattr(self, attr_name, value)
if item == "gcli_path":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=value)
else:
ai_client._gemini_cli_adapter.binary_path = value
elif action == "click":
item = task.get("item")
user_data = task.get("user_data")
if item == "btn_project_new_automated":
self._cb_new_project_automated(user_data)
elif item == "btn_mma_load_track":
self._cb_load_track(user_data)
elif item in self._clickable_actions:
# Check if it's a method that accepts user_data
import inspect
func = self._clickable_actions[item]
try:
sig = inspect.signature(func)
if 'user_data' in sig.parameters:
func(user_data=user_data)
else:
func()
except Exception:
func()
elif action == "select_list_item":
item = task.get("listbox", task.get("item"))
value = task.get("item_value", task.get("value"))
if item == "disc_listbox":
self._switch_discussion(value)
elif task.get("type") == "ask":
self._pending_ask_dialog = True
self._ask_request_id = task.get("request_id")
self._ask_tool_data = task.get("data", {})
elif action == "clear_ask":
if self._ask_request_id == task.get("request_id"):
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
elif action == "custom_callback":
cb = task.get("callback")
args = task.get("args", [])
if callable(cb):
try: cb(*args)
except Exception as e: print(f"Error in direct custom callback: {e}")
elif cb in self._predefined_callbacks:
self._predefined_callbacks[cb](*args)
elif action == "mma_step_approval":
dlg = MMAApprovalDialog(task.get("ticket_id"), task.get("payload"))
self._pending_mma_approval = task
if "dialog_container" in task:
task["dialog_container"][0] = dlg
elif action == 'refresh_from_project':
self._refresh_from_project()
elif action == "mma_spawn_approval":
dlg = MMASpawnApprovalDialog(
task.get("ticket_id"),
task.get("role"),
task.get("prompt"),
task.get("context_md")
)
self._pending_mma_spawn = task
self._mma_spawn_prompt = task.get("prompt", "")
self._mma_spawn_context = task.get("context_md", "")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
if "dialog_container" in task:
task["dialog_container"][0] = dlg
except Exception as e:
print(f"Error executing GUI task: {e}")
def _process_pending_history_adds(self) -> None:
"""Synchronizes pending history entries to the active discussion and project state."""
with self._pending_history_adds_lock:
items = self._pending_history_adds[:]
self._pending_history_adds.clear()
if not items:
return
self._scroll_disc_to_bottom = True
for item in items:
role = item.get("role", "unknown")
if item.get("role") and item["role"] not in self.disc_roles:
self.disc_roles.append(item["role"])
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
disc_data = discussions.get(self.active_discussion)
if disc_data is not None:
if item.get("disc_title", self.active_discussion) == self.active_discussion:
if self.disc_entries is not disc_data.get("history"):
if "history" not in disc_data:
disc_data["history"] = []
disc_data["history"].append(project_manager.entry_to_str(item))
disc_data["last_updated"] = project_manager.now_ts()
with self._disc_entries_lock:
self.disc_entries.append(item)
def _handle_approve_script(self) -> None:
"""Logic for approving a pending script via API hooks."""
print("[DEBUG] _handle_approve_script called")
with self._pending_dialog_lock:
if self._pending_dialog:
print(f"[DEBUG] Approving dialog for: {self._pending_dialog._script[:50]}...")
with self._pending_dialog._condition:
self._pending_dialog._approved = True
self._pending_dialog._done = True
self._pending_dialog._condition.notify_all()
self._pending_dialog = None
else:
print("[DEBUG] No pending dialog to approve")
def _handle_reject_script(self) -> None:
"""Logic for rejecting a pending script via API hooks."""
print("[DEBUG] _handle_reject_script called")
with self._pending_dialog_lock:
if self._pending_dialog:
print(f"[DEBUG] Rejecting dialog for: {self._pending_dialog._script[:50]}...")
with self._pending_dialog._condition:
self._pending_dialog._approved = False
self._pending_dialog._done = True
self._pending_dialog._condition.notify_all()
self._pending_dialog = None
else:
print("[DEBUG] No pending dialog to reject")
def _handle_approve_tool(self) -> None:
"""Logic for approving a pending tool execution via API hooks."""
print("[DEBUG] _handle_approve_tool called")
if self._pending_ask_dialog:
self._handle_approve_ask()
else:
print("[DEBUG] No pending tool approval found")
def _handle_approve_mma_step(self) -> None:
"""Logic for approving a pending MMA step execution via API hooks."""
print("[DEBUG] _handle_approve_mma_step called")
if self._pending_mma_approval:
self._handle_mma_respond(approved=True, payload=self._mma_approval_payload)
self._mma_approval_open = False
self._pending_mma_approval = None
else:
print("[DEBUG] No pending MMA step approval found")
def _handle_approve_spawn(self) -> None:
"""Logic for approving a pending sub-agent spawn via API hooks."""
print("[DEBUG] _handle_approve_spawn called")
if self._pending_mma_spawn:
# Synchronize with the handler logic
self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context)
# Crucially, close the modal state so UI can continue
self._mma_spawn_open = False
self._pending_mma_spawn = None
else:
print("[DEBUG] No pending spawn approval found")
def _handle_mma_respond(self, approved: bool, payload: str = None, abort: bool = False, prompt: str = None, context_md: str = None) -> None:
if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
if payload is not None:
dlg._payload = payload
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_approval = None
if self._pending_mma_spawn:
dlg = self._pending_mma_spawn.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
dlg._abort = abort
if prompt is not None:
dlg._prompt = prompt
if context_md is not None:
dlg._context_md = context_md
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_spawn = None
def _handle_approve_ask(self) -> None:
"""Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
def do_post():
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": request_id, "response": {"approved": True}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
threading.Thread(target=do_post, daemon=True).start()
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reject_ask(self) -> None:
"""Responds with rejection for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
def do_post():
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": request_id, "response": {"approved": False}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
threading.Thread(target=do_post, daemon=True).start()
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reset_session(self) -> None:
"""Logic for resetting the AI session."""
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.disc_entries.clear()
# Clear history in project dict too
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if self.active_discussion in discussions:
discussions[self.active_discussion]["history"] = []
self.ai_status = "session reset"
self.ai_response = ""
self.ui_ai_input = ""
with self._pending_history_adds_lock:
self._pending_history_adds.clear()
def _handle_md_only(self) -> None:
"""Logic for the 'MD Only' action."""
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
# Refresh token budget metrics with CURRENT md
self._refresh_api_metrics({}, md_content=md)
except Exception as e:
self.ai_status = f"error: {e}"
def _handle_generate_send(self) -> None:
"""Logic for the 'Gen + Send' action."""
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
self.ai_status = f"generate error: {e}"
return
self.ai_status = "sending..."
user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir
# Prepare event payload
event_payload = events.UserRequestEvent(
prompt=user_msg,
stable_md=stable_md,
file_items=file_items,
disc_text=disc_text,
base_dir=base_dir
)
# Push to async queue
asyncio.run_coroutine_threadsafe(
self.event_queue.put("user_request", event_payload),
self._loop
)
def _run_event_loop(self) -> None:
"""Runs the internal asyncio event loop."""
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._process_event_queue())
# Fallback: process queues even if GUI thread is idling/stuck
async def queue_fallback():
while True:
try:
self._process_pending_gui_tasks()
self._process_pending_history_adds()
except: pass
await asyncio.sleep(0.1)
self._loop.create_task(queue_fallback())
self._loop.run_forever()
def shutdown(self) -> None:
"""Cleanly shuts down the app's background tasks."""
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._loop_thread.is_alive():
self._loop_thread.join(timeout=2.0)
# Join other threads if they exist
if self.send_thread and self.send_thread.is_alive():
self.send_thread.join(timeout=1.0)
if self.models_thread and self.models_thread.is_alive():
self.models_thread.join(timeout=1.0)
async def _process_event_queue(self) -> None:
"""Listens for and processes events from the AsyncEventQueue."""
while True:
event_name, payload = await self.event_queue.get()
if event_name == "user_request":
# Handle the request in a separate thread to avoid blocking the loop
self._loop.run_in_executor(None, self._handle_request_event, payload)
elif event_name == "response":
# Handle AI response event
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": payload
})
elif event_name == "mma_state_update":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "mma_state_update",
"payload": payload
})
elif event_name == "mma_stream":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "mma_stream_append",
"payload": payload
})
elif event_name in ("mma_spawn_approval", "mma_step_approval"):
# Route approval events to GUI tasks — payload already has the
# correct structure for _process_pending_gui_tasks handlers.
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append(payload)
def _handle_request_event(self, event: events.UserRequestEvent) -> None:
"""Processes a UserRequestEvent by calling the AI client."""
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": event.prompt,
"collapsed": False,
"ts": project_manager.now_ts()
})
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools)
try:
resp = ai_client.send(
event.stable_md,
event.prompt,
event.base_dir,
event.file_items,
event.disc_text,
pre_tool_callback=self._confirm_and_run,
qa_callback=ai_client.run_tier4_analysis
)
# Emit response event
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": resp, "status": "done"}),
self._loop
)
except ProviderError as e:
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}),
self._loop
)
except Exception as e:
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}),
self._loop
)
def _test_callback_func_write_to_file(self, data: str) -> None:
"""A dummy function that a custom_callback would execute for testing."""
# Note: This file path is relative to where the test is run.
# This is for testing purposes only.
with open("tests/artifacts/temp_callback_output.txt", "w") as f:
f.write(data)
def _recalculate_session_usage(self) -> None:
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0}
for entry in ai_client.get_comms_log():
if entry.get("kind") == "response" and "usage" in entry.get("payload", {}):
u = entry["payload"]["usage"]
for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]:
if k in usage:
usage[k] += u.get(k, 0) or 0
self.session_usage = usage
def _refresh_api_metrics(self, payload: dict, md_content: str | None = None) -> None:
if "latency" in payload:
self.session_usage["last_latency"] = payload["latency"]
self._recalculate_session_usage()
def fetch_stats():
try:
stats = ai_client.get_history_bleed_stats(md_content=md_content or self.last_md)
self._token_budget_pct = stats.get("percentage", 0.0) / 100.0
self._token_budget_current = stats.get("current", 0)
self._token_budget_limit = stats.get("limit", 0)
except Exception:
pass
threading.Thread(target=fetch_stats, daemon=True).start()
cache_stats = payload.get("cache_stats")
if cache_stats:
count = cache_stats.get("cache_count", 0)
size_bytes = cache_stats.get("total_size_bytes", 0)
self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)"
def cb_load_prior_log(self) -> None:
root = hide_tk_root()
path = filedialog.askopenfilename(
title="Load Session Log",
initialdir="logs",
filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")]
)
root.destroy()
if not path:
return
entries = []
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
except Exception as e:
self.ai_status = f"log load error: {e}"
return
self.prior_session_entries = entries
self.is_viewing_prior_session = True
self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)"
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str | None:
print(f"[DEBUG] _confirm_and_run triggered for script length: {len(script)}")
dialog = ConfirmDialog(script, base_dir)
is_headless = "--headless" in sys.argv
if is_headless:
with self._pending_dialog_lock:
self._pending_actions[dialog._uid] = dialog
print(f"[PENDING_ACTION] Created action {dialog._uid}")
else:
with self._pending_dialog_lock:
self._pending_dialog = dialog
# Notify API hook subscribers
if self.test_hooks_enabled and hasattr(self, '_api_event_queue'):
print("[DEBUG] Pushing script_confirmation_required event to queue")
with self._api_event_queue_lock:
self._api_event_queue.append({
"type": "script_confirmation_required",
"action_id": dialog._uid,
"script": str(script),
"base_dir": str(base_dir),
"ts": time.time()
})
approved, final_script = dialog.wait()
if is_headless:
with self._pending_dialog_lock:
if dialog._uid in self._pending_actions:
del self._pending_actions[dialog._uid]
print(f"[DEBUG] _confirm_and_run result: approved={approved}")
if not approved:
self._append_tool_log(final_script, "REJECTED by user")
return None
self.ai_status = "running powershell..."
print(f"[DEBUG] Running powershell in {base_dir}")
output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback)
self._append_tool_log(final_script, output)
self.ai_status = "powershell done, awaiting AI..."
return output
def resolve_pending_action(self, action_id: str, approved: bool) -> bool:
"""Resolves a pending PowerShell script confirmation by its ID.
Args:
action_id: The unique identifier for the pending action.
approved: True if the script should be executed, False otherwise.
Returns:
bool: True if the action was found and resolved, False otherwise.
"""
with self._pending_dialog_lock:
if action_id in self._pending_actions:
dialog = self._pending_actions[action_id]
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
elif self._pending_dialog and self._pending_dialog._uid == action_id:
dialog = self._pending_dialog
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
return False
def _append_tool_log(self, script: str, result: str) -> None:
self._tool_log.append((script, result, time.time()))
self.ui_last_script_text = script
self.ui_last_script_output = result
self._trigger_script_blink = True
self.show_script_output = True
if self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
def _flush_to_project(self) -> None:
proj = self.project
proj.setdefault("output", {})["output_dir"] = self.ui_output_dir
proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir
proj["files"]["paths"] = self.files
proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir
proj["screenshots"]["paths"] = self.screenshots
proj.setdefault("project", {})
proj["project"]["git_dir"] = self.ui_project_git_dir
proj["project"]["system_prompt"] = self.ui_project_system_prompt
proj["project"]["main_context"] = self.ui_project_main_context
proj["project"]["word_wrap"] = self.ui_word_wrap
proj["project"]["summary_only"] = self.ui_summary_only
proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms
proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls
proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path
proj.setdefault("agent", {}).setdefault("tools", {})
for t_name in AGENT_TOOL_NAMES:
proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True)
self._flush_disc_entries_to_project()
disc_sec = proj.setdefault("discussion", {})
disc_sec["roles"] = self.disc_roles
disc_sec["active"] = self.active_discussion
disc_sec["auto_add"] = self.ui_auto_add_history
# Save MMA State
mma_sec = proj.setdefault("mma", {})
mma_sec["epic"] = self.ui_epic_input
if self.active_track:
# We only persist the basic metadata if full serialization is too complex
# For now, let's try full serialization via asdict
from dataclasses import asdict
mma_sec["active_track"] = asdict(self.active_track)
else:
mma_sec["active_track"] = None
def _flush_to_config(self) -> None:
self.config["ai"] = {
"provider": self.current_provider,
"model": self.current_model,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"history_trunc_limit": self.history_trunc_limit,
}
self.config["ai"]["system_prompt"] = self.ui_global_system_prompt
self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path}
self.config["gui"] = {"show_windows": self.show_windows}
theme.save_to_config(self.config)
def _do_generate(self) -> tuple[str, Path, list, str, str]:
"""Returns (full_md, output_path, file_items, stable_md, discussion_text)."""
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
track_id = self.active_track.id if self.active_track else None
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id)
full_md, path, file_items = aggregate.run(flat)
# Build stable markdown (no history) for Gemini caching
screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", "."))
screenshots = flat.get("screenshots", {}).get("paths", [])
summary_only = flat.get("project", {}).get("summary_only", False)
stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only)
# Build discussion history text separately
history = flat.get("discussion", {}).get("history", [])
discussion_text = aggregate.build_discussion_text(history)
return full_md, path, file_items, stable_md, discussion_text
def _fetch_models(self, provider: str) -> None:
self.ai_status = "fetching models..."
def do_fetch():
try:
models = ai_client.list_models(provider)
self.available_models = models
if self.current_model not in models and models:
self.current_model = models[0]
ai_client.set_provider(self.current_provider, self.current_model)
self.ai_status = f"models loaded: {len(models)}"
except Exception as e:
self.ai_status = f"model fetch error: {e}"
self.models_thread = threading.Thread(target=do_fetch, daemon=True)
self.models_thread.start()
# ---------------------------------------------------------------- helpers
def _render_text_viewer(self, label: str, content: str) -> None:
if imgui.button("[+]##" + str(id(content))):
self.show_text_viewer = True
self.text_viewer_title = label
self.text_viewer_content = content
def _render_heavy_text(self, label: str, content: str) -> None:
imgui.text_colored(C_LBL, f"{label}:")
imgui.same_line()
if imgui.button("[+]##" + label):
self.show_text_viewer = True
self.text_viewer_title = label
self.text_viewer_content = content
if len(content) > COMMS_CLAMP_CHARS:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content)
imgui.pop_text_wrap_pos()
else:
imgui.begin_child(f"heavy_text_child_{label}", imgui.ImVec2(0, 80), True)
imgui.input_text_multiline(f"##{label}_input", content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end_child()
else:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content if content else "(empty)")
imgui.pop_text_wrap_pos()
else:
imgui.text(content if content else "(empty)")
# ---------------------------------------------------------------- gui
def _show_menus(self) -> None:
if imgui.begin_menu("Windows"):
for w in self.show_windows.keys():
_, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w])
imgui.end_menu()
if imgui.begin_menu("Project"):
if imgui.menu_item("Save All", "", False)[0]:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
if imgui.menu_item("Reset Session", "", False)[0]:
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.ai_status = "session reset"
self.ai_response = ""
if imgui.menu_item("Generate MD Only", "", False)[0]:
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
except Exception as e:
self.ai_status = f"error: {e}"
imgui.end_menu()
def _gui_func(self) -> None:
try:
self.perf_monitor.start_frame()
# Process GUI task queue
self._process_pending_gui_tasks()
self._render_track_proposal_modal()
# Auto-save (every 60s)
now = time.time()
if now - self._last_autosave >= self._autosave_interval:
self._last_autosave = now
try:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
except Exception:
pass # silent — don't disrupt the GUI loop
# Sync pending comms
with self._pending_comms_lock:
if self._pending_comms and self.ui_auto_scroll_comms:
self._scroll_comms_to_bottom = True
for c in self._pending_comms:
self._comms_log.append(c)
self._pending_comms.clear()
with self._pending_tool_calls_lock:
if self._pending_tool_calls and self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
for tc in self._pending_tool_calls:
self._tool_log.append(tc)
self._pending_tool_calls.clear()
# ---- Menubar
if imgui.begin_main_menu_bar():
if imgui.begin_menu("manual slop"):
if imgui.menu_item("Quit", "Ctrl+Q", False)[0]:
self.should_quit = True
imgui.end_menu()
if imgui.begin_menu("View"):
for name in self.show_windows:
_, self.show_windows[name] = imgui.menu_item(name, "", self.show_windows[name])
imgui.end_menu()
if imgui.begin_menu("Project"):
if imgui.menu_item("Save All", "Ctrl+S", False)[0]:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
if imgui.menu_item("Generate MD Only", "", False)[0]:
self._handle_md_only()
if imgui.menu_item("Reset Session", "", False)[0]:
self._handle_reset_session()
imgui.end_menu()
imgui.end_main_menu_bar()
# --- Hubs ---
if self.show_windows.get("Context Hub", False):
exp, self.show_windows["Context Hub"] = imgui.begin("Context Hub", self.show_windows["Context Hub"])
if exp:
self._render_projects_panel()
imgui.end()
if self.show_windows.get("Files & Media", False):
exp, self.show_windows["Files & Media"] = imgui.begin("Files & Media", self.show_windows["Files & Media"])
if exp:
if imgui.collapsing_header("Files"):
self._render_files_panel()
if imgui.collapsing_header("Screenshots"):
self._render_screenshots_panel()
imgui.end()
if self.show_windows.get("AI Settings", False):
exp, self.show_windows["AI Settings"] = imgui.begin("AI Settings", self.show_windows["AI Settings"])
if exp:
if imgui.collapsing_header("Provider & Model"):
self._render_provider_panel()
if imgui.collapsing_header("System Prompts"):
self._render_system_prompts_panel()
imgui.end()
if self.show_windows.get("MMA Dashboard", False):
exp, self.show_windows["MMA Dashboard"] = imgui.begin("MMA Dashboard", self.show_windows["MMA Dashboard"])
if exp:
self._render_mma_dashboard()
imgui.end()
if self.show_windows.get("Tier 1: Strategy", False):
exp, self.show_windows["Tier 1: Strategy"] = imgui.begin("Tier 1: Strategy", self.show_windows["Tier 1: Strategy"])
if exp:
self._render_tier_stream_panel("Tier 1", "Tier 1")
imgui.end()
if self.show_windows.get("Tier 2: Tech Lead", False):
exp, self.show_windows["Tier 2: Tech Lead"] = imgui.begin("Tier 2: Tech Lead", self.show_windows["Tier 2: Tech Lead"])
if exp:
self._render_tier_stream_panel("Tier 2", "Tier 2 (Tech Lead)")
imgui.end()
if self.show_windows.get("Tier 3: Workers", False):
exp, self.show_windows["Tier 3: Workers"] = imgui.begin("Tier 3: Workers", self.show_windows["Tier 3: Workers"])
if exp:
self._render_tier_stream_panel("Tier 3", None)
imgui.end()
if self.show_windows.get("Tier 4: QA", False):
exp, self.show_windows["Tier 4: QA"] = imgui.begin("Tier 4: QA", self.show_windows["Tier 4: QA"])
if exp:
self._render_tier_stream_panel("Tier 4", "Tier 4 (QA)")
imgui.end()
if self.show_windows.get("Theme", False):
self._render_theme_panel()
if self.show_windows.get("Discussion Hub", False):
exp, self.show_windows["Discussion Hub"] = imgui.begin("Discussion Hub", self.show_windows["Discussion Hub"])
if exp:
# Top part for the history
imgui.begin_child("HistoryChild", size=(0, -200))
self._render_discussion_panel()
imgui.end_child()
# Bottom part with tabs for message and response
if imgui.begin_tab_bar("MessageResponseTabs"):
if imgui.begin_tab_item("Message")[0]:
self._render_message_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Response")[0]:
self._render_response_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
if self.show_windows.get("Operations Hub", False):
exp, self.show_windows["Operations Hub"] = imgui.begin("Operations Hub", self.show_windows["Operations Hub"])
if exp:
if imgui.begin_tab_bar("OperationsTabs"):
if imgui.begin_tab_item("Tool Calls")[0]:
self._render_tool_calls_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Comms History")[0]:
self._render_comms_history_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
if self.show_windows.get("Log Management", False):
self._render_log_management()
if self.show_windows["Diagnostics"]:
exp, self.show_windows["Diagnostics"] = imgui.begin("Diagnostics", self.show_windows["Diagnostics"])
if exp:
now = time.time()
if now - self._perf_last_update >= 0.5:
self._perf_last_update = now
metrics = self.perf_monitor.get_metrics()
self.perf_history["frame_time"].pop(0)
self.perf_history["frame_time"].append(metrics.get("last_frame_time_ms", 0.0))
self.perf_history["fps"].pop(0)
self.perf_history["fps"].append(metrics.get("fps", 0.0))
self.perf_history["cpu"].pop(0)
self.perf_history["cpu"].append(metrics.get("cpu_percent", 0.0))
self.perf_history["input_lag"].pop(0)
self.perf_history["input_lag"].append(metrics.get("input_lag_ms", 0.0))
metrics = self.perf_monitor.get_metrics()
imgui.text("Performance Telemetry")
imgui.separator()
if imgui.begin_table("perf_table", 2, imgui.TableFlags_.borders_inner_h):
imgui.table_setup_column("Metric")
imgui.table_setup_column("Value")
imgui.table_headers_row()
imgui.table_next_row()
imgui.table_next_column()
imgui.text("FPS")
imgui.table_next_column()
imgui.text(f"{metrics.get('fps', 0.0):.1f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("Frame Time (ms)")
imgui.table_next_column()
imgui.text(f"{metrics.get('last_frame_time_ms', 0.0):.2f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("CPU %")
imgui.table_next_column()
imgui.text(f"{metrics.get('cpu_percent', 0.0):.1f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("Input Lag (ms)")
imgui.table_next_column()
imgui.text(f"{metrics.get('input_lag_ms', 0.0):.1f}")
imgui.end_table()
imgui.separator()
imgui.text("Frame Time (ms)")
imgui.plot_lines("##ft_plot", np.array(self.perf_history["frame_time"], dtype=np.float32), overlay_text="frame_time", graph_size=imgui.ImVec2(-1, 60))
imgui.text("CPU %")
imgui.plot_lines("##cpu_plot", np.array(self.perf_history["cpu"], dtype=np.float32), overlay_text="cpu", graph_size=imgui.ImVec2(-1, 60))
imgui.end()
self.perf_monitor.end_frame()
# ---- Modals / Popups
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
if not self._pending_dialog_open:
imgui.open_popup("Approve PowerShell Command")
self._pending_dialog_open = True
else:
self._pending_dialog_open = False
if imgui.begin_popup_modal("Approve PowerShell Command", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not dlg:
imgui.close_current_popup()
else:
imgui.text("The AI wants to run the following PowerShell script:")
imgui.text_colored(vec4(200, 200, 100), f"base_dir: {dlg._base_dir}")
imgui.separator()
# Checkbox to toggle full preview inside modal
_, self.show_text_viewer = imgui.checkbox("Show Full Preview", self.show_text_viewer)
if self.show_text_viewer:
imgui.begin_child("preview_child", imgui.ImVec2(600, 300), True)
imgui.text_unformatted(dlg._script)
imgui.end_child()
else:
ch, dlg._script = imgui.input_text_multiline("##confirm_script", dlg._script, imgui.ImVec2(-1, 200))
imgui.separator()
if imgui.button("Approve & Run", imgui.ImVec2(120, 0)):
with dlg._condition:
dlg._approved = True
dlg._done = True
dlg._condition.notify_all()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Reject", imgui.ImVec2(120, 0)):
with dlg._condition:
dlg._approved = False
dlg._done = True
dlg._condition.notify_all()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.end_popup()
if self._pending_ask_dialog:
if not self._ask_dialog_open:
imgui.open_popup("Approve Tool Execution")
self._ask_dialog_open = True
else:
self._ask_dialog_open = False
if imgui.begin_popup_modal("Approve Tool Execution", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_ask_dialog:
imgui.close_current_popup()
else:
tool_name = self._ask_tool_data.get("tool", "unknown")
tool_args = self._ask_tool_data.get("args", {})
imgui.text("The AI wants to execute a tool:")
imgui.text_colored(vec4(200, 200, 100), f"Tool: {tool_name}")
imgui.separator()
imgui.text("Arguments:")
imgui.begin_child("ask_args_child", imgui.ImVec2(400, 200), True)
imgui.text_unformatted(json.dumps(tool_args, indent=2))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_approve_ask()
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Deny", imgui.ImVec2(120, 0)):
self._handle_reject_ask()
imgui.close_current_popup()
imgui.end_popup()
# MMA Step Approval Modal
if self._pending_mma_approval:
if not self._mma_approval_open:
imgui.open_popup("MMA Step Approval")
self._mma_approval_open = True
self._mma_approval_edit_mode = False
self._mma_approval_payload = self._pending_mma_approval.get("payload", "")
else:
self._mma_approval_open = False
if imgui.begin_popup_modal("MMA Step Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_approval:
imgui.close_current_popup()
else:
ticket_id = self._pending_mma_approval.get("ticket_id", "??")
imgui.text(f"Ticket {ticket_id} is waiting for tool execution approval.")
imgui.separator()
if self._mma_approval_edit_mode:
imgui.text("Edit Raw Payload (Manual Memory Mutation):")
_, self._mma_approval_payload = imgui.input_text_multiline("##mma_payload", self._mma_approval_payload, imgui.ImVec2(600, 400))
else:
imgui.text("Proposed Tool Call:")
imgui.begin_child("mma_preview", imgui.ImVec2(600, 300), True)
imgui.text_unformatted(str(self._pending_mma_approval.get("payload", "")))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, payload=self._mma_approval_payload)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Payload" if not self._mma_approval_edit_mode else "Show Original", imgui.ImVec2(120, 0)):
self._mma_approval_edit_mode = not self._mma_approval_edit_mode
imgui.same_line()
if imgui.button("Abort Ticket", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False)
imgui.close_current_popup()
imgui.end_popup()
# MMA Spawn Approval Modal
if self._pending_mma_spawn:
if not self._mma_spawn_open:
imgui.open_popup("MMA Spawn Approval")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
self._mma_spawn_prompt = self._pending_mma_spawn.get("prompt", "")
self._mma_spawn_context = self._pending_mma_spawn.get("context_md", "")
else:
self._mma_spawn_open = False
if imgui.begin_popup_modal("MMA Spawn Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_spawn:
imgui.close_current_popup()
else:
role = self._pending_mma_spawn.get("role", "??")
ticket_id = self._pending_mma_spawn.get("ticket_id", "??")
imgui.text(f"Spawning {role} for Ticket {ticket_id}")
imgui.separator()
if self._mma_spawn_edit_mode:
imgui.text("Edit Prompt:")
_, self._mma_spawn_prompt = imgui.input_text_multiline("##spawn_prompt", self._mma_spawn_prompt, imgui.ImVec2(800, 200))
imgui.text("Edit Context MD:")
_, self._mma_spawn_context = imgui.input_text_multiline("##spawn_context", self._mma_spawn_context, imgui.ImVec2(800, 300))
else:
imgui.text("Proposed Prompt:")
imgui.begin_child("spawn_prompt_preview", imgui.ImVec2(800, 150), True)
imgui.text_unformatted(self._mma_spawn_prompt)
imgui.end_child()
imgui.text("Proposed Context MD:")
imgui.begin_child("spawn_context_preview", imgui.ImVec2(800, 250), True)
imgui.text_unformatted(self._mma_spawn_context)
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Mode" if not self._mma_spawn_edit_mode else "Preview Mode", imgui.ImVec2(120, 0)):
self._mma_spawn_edit_mode = not self._mma_spawn_edit_mode
imgui.same_line()
if imgui.button("Abort", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False, abort=True)
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output:
if self._trigger_script_blink:
self._trigger_script_blink = False
self._is_script_blinking = True
self._script_blink_start_time = time.time()
try:
imgui.set_window_focus("Last Script Output")
except Exception:
pass
if self._is_script_blinking:
elapsed = time.time() - self._script_blink_start_time
if elapsed > 1.5:
self._is_script_blinking = False
else:
val = math.sin(elapsed * 8 * math.pi)
alpha = 60/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 100, 255, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 100, 255, alpha))
imgui.set_next_window_size(imgui.ImVec2(800, 600), imgui.Cond_.first_use_ever)
expanded, self.show_script_output = imgui.begin("Last Script Output", self.show_script_output)
if expanded:
imgui.text("Script:")
imgui.same_line()
self._render_text_viewer("Last Script", self.ui_last_script_text)
if self.ui_word_wrap:
imgui.begin_child("lso_s_wrap", imgui.ImVec2(-1, 200), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_text)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_s", self.ui_last_script_text, imgui.ImVec2(-1, 200), imgui.InputTextFlags_.read_only)
imgui.separator()
imgui.text("Output:")
imgui.same_line()
self._render_text_viewer("Last Output", self.ui_last_script_output)
if self.ui_word_wrap:
imgui.begin_child("lso_o_wrap", imgui.ImVec2(-1, -1), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_output)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_o", self.ui_last_script_output, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
if self._is_script_blinking:
imgui.pop_style_color(2)
imgui.end()
if self.show_text_viewer:
imgui.set_next_window_size(imgui.ImVec2(900, 700), imgui.Cond_.first_use_ever)
expanded, self.show_text_viewer = imgui.begin(f"Text Viewer - {self.text_viewer_title}", self.show_text_viewer)
if expanded:
if self.ui_word_wrap:
imgui.begin_child("tv_wrap", imgui.ImVec2(-1, -1), False)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.text_viewer_content)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##tv_c", self.text_viewer_content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end()
except Exception as e:
print(f"ERROR in _gui_func: {e}")
import traceback
traceback.print_exc()
def _render_projects_panel(self) -> None:
proj_name = self.project.get("project", {}).get("name", Path(self.active_project_path).stem)
imgui.text_colored(C_IN, f"Active: {proj_name}")
imgui.separator()
imgui.text("Git Directory")
ch, self.ui_project_git_dir = imgui.input_text("##git_dir", self.ui_project_git_dir)
imgui.same_line()
if imgui.button("Browse##git"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Git Directory")
r.destroy()
if d: self.ui_project_git_dir = d
imgui.separator()
imgui.text("Main Context File")
ch, self.ui_project_main_context = imgui.input_text("##main_ctx", self.ui_project_main_context)
imgui.same_line()
if imgui.button("Browse##ctx"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select Main Context File")
r.destroy()
if p: self.ui_project_main_context = p
imgui.separator()
imgui.text("Output Dir")
ch, self.ui_output_dir = imgui.input_text("##out_dir", self.ui_output_dir)
imgui.same_line()
if imgui.button("Browse##out"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Output Dir")
r.destroy()
if d: self.ui_output_dir = d
imgui.separator()
imgui.text("Project Files")
imgui.begin_child("proj_files", imgui.ImVec2(0, 150), True)
for i, pp in enumerate(self.project_paths):
is_active = (pp == self.active_project_path)
if imgui.button(f"x##p{i}"):
removed = self.project_paths.pop(i)
if removed == self.active_project_path and self.project_paths:
self._switch_project(self.project_paths[0])
break
imgui.same_line()
marker = " *" if is_active else ""
if is_active: imgui.push_style_color(imgui.Col_.text, C_IN)
if imgui.button(f"{Path(pp).stem}{marker}##ps{i}"):
self._switch_project(pp)
if is_active: imgui.pop_style_color()
imgui.same_line()
imgui.text_colored(C_LBL, pp)
imgui.end_child()
if imgui.button("Add Project"):
r = hide_tk_root()
p = filedialog.askopenfilename(
title="Select Project .toml",
filetypes=[("TOML", "*.toml"), ("All", "*.*")],
)
r.destroy()
if p and p not in self.project_paths:
self.project_paths.append(p)
imgui.same_line()
if imgui.button("New Project"):
r = hide_tk_root()
p = filedialog.asksaveasfilename(title="Create New Project .toml", defaultextension=".toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")])
r.destroy()
if p:
name = Path(p).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, p)
if p not in self.project_paths:
self.project_paths.append(p)
self._switch_project(p)
imgui.same_line()
if imgui.button("Save All"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
ch, self.ui_word_wrap = imgui.checkbox("Word-Wrap (Read-only panels)", self.ui_word_wrap)
ch, self.ui_summary_only = imgui.checkbox("Summary Only (send file structure, not full content)", self.ui_summary_only)
ch, self.ui_auto_scroll_comms = imgui.checkbox("Auto-scroll Comms History", self.ui_auto_scroll_comms)
ch, self.ui_auto_scroll_tool_calls = imgui.checkbox("Auto-scroll Tool History", self.ui_auto_scroll_tool_calls)
if imgui.collapsing_header("Agent Tools"):
for t_name in AGENT_TOOL_NAMES:
val = self.ui_agent_tools.get(t_name, True)
ch, val = imgui.checkbox(f"Enable {t_name}", val)
if ch:
self.ui_agent_tools[t_name] = val
imgui.separator()
imgui.text_colored(C_LBL, 'MMA Orchestration')
_, self.ui_epic_input = imgui.input_text_multiline('##epic_input', self.ui_epic_input, imgui.ImVec2(-1, 80))
if imgui.button('Plan Epic (Tier 1)', imgui.ImVec2(-1, 0)):
self._cb_plan_epic()
def _cb_plan_epic(self) -> None:
def _bg_task():
try:
self.ai_status = "Planning Epic (Tier 1)..."
history = orchestrator_pm.get_track_history_summary()
proj = project_manager.load_project(self.active_project_path)
flat = project_manager.flat_config(proj)
file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", []))
_t1_baseline = len(ai_client.get_comms_log())
tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history)
_t1_new = ai_client.get_comms_log()[_t1_baseline:]
_t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"]
_t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp)
_t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp)
def _push_t1_usage(i, o):
self.mma_tier_usage["Tier 1"]["input"] += i
self.mma_tier_usage["Tier 1"]["output"] += o
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "custom_callback",
"callback": _push_t1_usage,
"args": [_t1_in, _t1_out]
})
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": {
"text": json.dumps(tracks, indent=2),
"stream_id": "Tier 1",
"status": "Epic tracks generated."
}
})
self._pending_gui_tasks.append({
"action": "show_track_proposal",
"payload": tracks
})
except Exception as e:
self.ai_status = f"Epic plan error: {e}"
print(f"ERROR in _cb_plan_epic background task: {e}")
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_accept_tracks(self) -> None:
self._show_track_proposal_modal = False
def _bg_task():
# Generate skeletons once
self.ai_status = "Phase 2: Generating skeletons for all tracks..."
parser = ASTParser(language="python")
generated_skeletons = ""
try:
for i, file_path in enumerate(self.files):
try:
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
abs_path = Path(self.ui_files_base_dir) / file_path
if abs_path.exists() and abs_path.suffix == ".py":
with open(abs_path, "r", encoding="utf-8") as f:
code = f.read()
generated_skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n"
except Exception as e:
print(f"Error parsing skeleton for {file_path}: {e}")
except Exception as e:
self.ai_status = f"Error generating skeletons: {e}"
print(f"Error generating skeletons: {e}")
return # Exit if skeleton generation fails
# Now loop through tracks and call _start_track_logic with generated skeletons
total_tracks = len(self.proposed_tracks)
for i, track_data in enumerate(self.proposed_tracks):
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
self.ai_status = f"Processing track {i+1} of {total_tracks}: '{title}'..."
self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started
self.ai_status = f"All {total_tracks} tracks accepted and execution started."
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_start_track(self, user_data: Any = None) -> None:
if isinstance(user_data, str):
# If track_id is provided directly
track_id = user_data
# Ensure it's loaded as active
if not self.active_track or self.active_track.id != track_id:
self._cb_load_track(track_id)
if self.active_track:
# Use the active track object directly to start execution
self.mma_status = "running"
engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode)
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id)
full_md, _, _ = aggregate.run(flat)
asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop)
self.ai_status = f"Track '{self.active_track.description}' started."
return
idx = 0
if isinstance(user_data, int):
idx = user_data
elif isinstance(user_data, dict):
idx = user_data.get("index", 0)
if 0 <= idx < len(self.proposed_tracks):
track_data = self.proposed_tracks[idx]
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start()
self.ai_status = f"Track '{title}' started."
def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None:
try:
goal = track_data.get("goal", "")
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
self.ai_status = f"Phase 2: Generating tickets for {title}..."
skeletons = "" # Initialize skeletons variable
if skeletons_str is None: # Only generate if not provided
# 1. Get skeletons for context
parser = ASTParser(language="python")
for i, file_path in enumerate(self.files):
try:
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
abs_path = Path(self.ui_files_base_dir) / file_path
if abs_path.exists() and abs_path.suffix == ".py":
with open(abs_path, "r", encoding="utf-8") as f:
code = f.read()
skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n"
except Exception as e:
print(f"Error parsing skeleton for {file_path}: {e}")
else:
skeletons = skeletons_str # Use provided skeletons
self.ai_status = "Phase 2: Calling Tech Lead..."
_t2_baseline = len(ai_client.get_comms_log())
raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons)
_t2_new = ai_client.get_comms_log()[_t2_baseline:]
_t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"]
_t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp)
_t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp)
self.mma_tier_usage["Tier 2"]["input"] += _t2_in
self.mma_tier_usage["Tier 2"]["output"] += _t2_out
if not raw_tickets:
self.ai_status = f"Error: No tickets generated for track: {title}"
print(f"Warning: No tickets generated for track: {title}")
return
self.ai_status = "Phase 2: Sorting tickets..."
try:
sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets)
except ValueError as e:
print(f"Dependency error in track '{title}': {e}")
sorted_tickets_data = raw_tickets
# 3. Create Track and Ticket objects
from datetime import datetime
now = datetime.now()
tickets = []
for t_data in sorted_tickets_data:
ticket = Ticket(
id=t_data["id"],
description=t_data.get("description") or t_data.get("goal", "No description"),
status=t_data.get("status", "todo"),
assigned_to=t_data.get("assigned_to", "unassigned"),
depends_on=t_data.get("depends_on", []),
step_mode=t_data.get("step_mode", False)
)
tickets.append(ticket)
track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}"
track = Track(id=track_id, description=title, tickets=tickets)
# Initialize track state in the filesystem
from models import TrackState, Metadata
from datetime import datetime
now = datetime.now()
meta = Metadata(id=track_id, name=title, status="todo", created_at=now, updated_at=now)
state = TrackState(metadata=meta, discussion=[], tasks=tickets)
project_manager.save_track_state(track_id, state, self.ui_files_base_dir)
# 4. Initialize ConductorEngine and run loop
engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode)
# Use current full markdown context for the track execution
track_id_param = track.id
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param)
full_md, _, _ = aggregate.run(flat)
# Schedule the coroutine on the internal event loop
asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop)
except Exception as e:
self.ai_status = f"Track start error: {e}"
print(f"ERROR in _start_track_logic: {e}")
def _render_track_proposal_modal(self) -> None:
if self._show_track_proposal_modal:
imgui.open_popup("Track Proposal")
if imgui.begin_popup_modal("Track Proposal", True, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._show_track_proposal_modal:
imgui.close_current_popup()
imgui.end_popup()
return
imgui.text_colored(C_IN, "Proposed Implementation Tracks")
imgui.separator()
if not self.proposed_tracks:
imgui.text("No tracks generated.")
else:
for idx, track in enumerate(self.proposed_tracks):
# Title Edit
changed_t, new_t = imgui.input_text(f"Title##{idx}", track.get('title', ''))
if changed_t:
track['title'] = new_t
# Goal Edit
changed_g, new_g = imgui.input_text_multiline(f"Goal##{idx}", track.get('goal', ''), imgui.ImVec2(-1, 60))
if changed_g:
track['goal'] = new_g
# Buttons
if imgui.button(f"Remove##{idx}"):
self.proposed_tracks.pop(idx)
break
imgui.same_line()
if imgui.button(f"Start This Track##{idx}"):
self._cb_start_track(idx)
imgui.separator()
if imgui.button("Accept", imgui.ImVec2(120, 0)):
self._cb_accept_tracks()
self._show_track_proposal_modal = False
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Cancel", imgui.ImVec2(120, 0)):
self._show_track_proposal_modal = False
imgui.close_current_popup()
imgui.end_popup()
def _render_log_management(self) -> None:
exp, self.show_windows["Log Management"] = imgui.begin("Log Management", self.show_windows["Log Management"])
if not exp:
imgui.end()
return
registry = LogRegistry("logs/log_registry.toml")
sessions = registry.data
if imgui.begin_table("sessions_table", 7, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable):
imgui.table_setup_column("Session ID")
imgui.table_setup_column("Start Time")
imgui.table_setup_column("Star")
imgui.table_setup_column("Reason")
imgui.table_setup_column("Size (KB)")
imgui.table_setup_column("Msgs")
imgui.table_setup_column("Actions")
imgui.table_headers_row()
for session_id, s_data in sessions.items():
imgui.table_next_row()
imgui.table_next_column()
imgui.text(session_id)
imgui.table_next_column()
imgui.text(s_data.get("start_time", ""))
imgui.table_next_column()
whitelisted = s_data.get("whitelisted", False)
if whitelisted:
imgui.text_colored(vec4(255, 215, 0), "YES")
else:
imgui.text("NO")
metadata = s_data.get("metadata") or {}
imgui.table_next_column()
imgui.text(metadata.get("reason", ""))
imgui.table_next_column()
imgui.text(str(metadata.get("size_kb", "")))
imgui.table_next_column()
imgui.text(str(metadata.get("message_count", "")))
imgui.table_next_column()
if whitelisted:
if imgui.button(f"Unstar##{session_id}"):
registry.update_session_metadata(
session_id,
message_count=metadata.get("message_count"),
errors=metadata.get("errors"),
size_kb=metadata.get("size_kb"),
whitelisted=False,
reason=metadata.get("reason")
)
else:
if imgui.button(f"Star##{session_id}"):
registry.update_session_metadata(
session_id,
message_count=metadata.get("message_count"),
errors=metadata.get("errors"),
size_kb=metadata.get("size_kb"),
whitelisted=True,
reason="Manually whitelisted"
)
imgui.end_table()
imgui.end()
def _render_files_panel(self) -> None:
imgui.text("Base Dir")
ch, self.ui_files_base_dir = imgui.input_text("##f_base", self.ui_files_base_dir)
imgui.same_line()
if imgui.button("Browse##fb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_files_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("f_paths", imgui.ImVec2(0, -40), True)
for i, f in enumerate(self.files):
if imgui.button(f"x##f{i}"):
self.files.pop(i)
break
imgui.same_line()
imgui.text(f)
imgui.end_child()
if imgui.button("Add File(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames()
r.destroy()
for p in paths:
if p not in self.files: self.files.append(p)
imgui.same_line()
if imgui.button("Add Wildcard"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.files.append(str(Path(d) / "**" / "*"))
def _render_screenshots_panel(self) -> None:
imgui.text("Base Dir")
ch, self.ui_shots_base_dir = imgui.input_text("##s_base", self.ui_shots_base_dir)
imgui.same_line()
if imgui.button("Browse##sb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_shots_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("s_paths", imgui.ImVec2(0, -40), True)
for i, s in enumerate(self.screenshots):
if imgui.button(f"x##s{i}"):
self.screenshots.pop(i)
break
imgui.same_line()
imgui.text(s)
imgui.end_child()
if imgui.button("Add Screenshot(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames(
title="Select Screenshots",
filetypes=[("Images", "*.png *.jpg *.jpeg *.gif *.bmp *.webp"), ("All", "*.*")],
)
r.destroy()
for p in paths:
if p not in self.screenshots: self.screenshots.append(p)
def _render_discussion_panel(self) -> None:
# THINKING indicator
is_thinking = self.ai_status in ["sending..."]
if is_thinking:
val = math.sin(time.time() * 10 * math.pi)
alpha = 1.0 if val > 0 else 0.0
imgui.text_colored(imgui.ImVec4(1.0, 0.39, 0.39, alpha), "THINKING...")
imgui.separator()
# Prior session viewing mode
if self.is_viewing_prior_session:
imgui.push_style_color(imgui.Col_.child_bg, vec4(50, 40, 20))
imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION")
imgui.same_line()
if imgui.button("Exit Prior Session"):
self.is_viewing_prior_session = False
self.prior_session_entries.clear()
imgui.separator()
imgui.begin_child("prior_scroll", imgui.ImVec2(0, 0), False)
for idx, entry in enumerate(self.prior_session_entries):
imgui.push_id(f"prior_{idx}")
kind = entry.get("kind", entry.get("type", ""))
imgui.text_colored(C_LBL, f"#{idx+1}")
imgui.same_line()
ts = entry.get("ts", entry.get("timestamp", ""))
if ts:
imgui.text_colored(vec4(160, 160, 160), str(ts))
imgui.same_line()
imgui.text_colored(C_KEY, str(kind))
payload = entry.get("payload", entry)
text = payload.get("text", payload.get("message", payload.get("content", "")))
if text:
preview = str(text).replace("\\n", " ")[:200]
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(preview)
imgui.pop_text_wrap_pos()
else:
imgui.text(preview)
imgui.separator()
imgui.pop_id()
imgui.end_child()
imgui.pop_style_color()
return
if not self.is_viewing_prior_session and imgui.collapsing_header("Discussions", imgui.TreeNodeFlags_.default_open):
names = self._get_discussion_names()
if imgui.begin_combo("##disc_sel", self.active_discussion):
for name in names:
is_selected = (name == self.active_discussion)
if imgui.selectable(name, is_selected)[0]:
self._switch_discussion(name)
if is_selected:
imgui.set_item_default_focus()
imgui.end_combo()
if self.active_track:
imgui.same_line()
changed, self._track_discussion_active = imgui.checkbox("Track Discussion", self._track_discussion_active)
if changed:
if self._track_discussion_active:
self._flush_disc_entries_to_project()
history_strings = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir)
with self._disc_entries_lock:
self.disc_entries = _parse_history_entries(history_strings, self.disc_roles)
self.ai_status = f"track discussion: {self.active_track.id}"
else:
self._flush_disc_entries_to_project()
# Restore project discussion
self._switch_discussion(self.active_discussion)
disc_sec = self.project.get("discussion", {})
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
git_commit = disc_data.get("git_commit", "")
last_updated = disc_data.get("last_updated", "")
imgui.text_colored(C_LBL, "commit:")
imgui.same_line()
imgui.text_colored(C_IN if git_commit else C_LBL, git_commit[:12] if git_commit else "(none)")
imgui.same_line()
if imgui.button("Update Commit"):
git_dir = self.ui_project_git_dir
if git_dir:
cmt = project_manager.get_git_commit(git_dir)
if cmt:
disc_data["git_commit"] = cmt
disc_data["last_updated"] = project_manager.now_ts()
self.ai_status = f"commit: {cmt[:12]}"
imgui.text_colored(C_LBL, "updated:")
imgui.same_line()
imgui.text_colored(C_SUB, last_updated if last_updated else "(never)")
ch, self.ui_disc_new_name_input = imgui.input_text("##new_disc", self.ui_disc_new_name_input)
imgui.same_line()
if imgui.button("Create"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._create_discussion(nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Rename"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._rename_discussion(self.active_discussion, nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Delete"):
self._delete_discussion(self.active_discussion)
if not self.is_viewing_prior_session:
imgui.separator()
if imgui.button("+ Entry"):
self.disc_entries.append({"role": self.disc_roles[0] if self.disc_roles else "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
if imgui.button("-All"):
for e in self.disc_entries: e["collapsed"] = True
imgui.same_line()
if imgui.button("+All"):
for e in self.disc_entries: e["collapsed"] = False
imgui.same_line()
if imgui.button("Clear All"):
self.disc_entries.clear()
imgui.same_line()
if imgui.button("Save"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "discussion saved"
ch, self.ui_auto_add_history = imgui.checkbox("Auto-add message & response to history", self.ui_auto_add_history)
# Truncation controls
imgui.text("Keep Pairs:")
imgui.same_line()
imgui.set_next_item_width(80)
ch, self.ui_disc_truncate_pairs = imgui.input_int("##trunc_pairs", self.ui_disc_truncate_pairs, 1)
if self.ui_disc_truncate_pairs < 1: self.ui_disc_truncate_pairs = 1
imgui.same_line()
if imgui.button("Truncate"):
with self._disc_entries_lock:
self.disc_entries = truncate_entries(self.disc_entries, self.ui_disc_truncate_pairs)
self.ai_status = f"history truncated to {self.ui_disc_truncate_pairs} pairs"
imgui.separator()
if imgui.collapsing_header("Roles"):
imgui.begin_child("roles_scroll", imgui.ImVec2(0, 100), True)
for i, r in enumerate(self.disc_roles):
if imgui.button(f"x##r{i}"):
self.disc_roles.pop(i)
break
imgui.same_line()
imgui.text(r)
imgui.end_child()
ch, self.ui_disc_new_role_input = imgui.input_text("##new_role", self.ui_disc_new_role_input)
imgui.same_line()
if imgui.button("Add"):
r = self.ui_disc_new_role_input.strip()
if r and r not in self.disc_roles:
self.disc_roles.append(r)
self.ui_disc_new_role_input = ""
imgui.separator()
imgui.begin_child("disc_scroll", imgui.ImVec2(0, 0), False)
clipper = imgui.ListClipper()
clipper.begin(len(self.disc_entries))
while clipper.step():
for i in range(clipper.display_start, clipper.display_end):
entry = self.disc_entries[i]
imgui.push_id(str(i))
collapsed = entry.get("collapsed", False)
read_mode = entry.get("read_mode", False)
if imgui.button("+" if collapsed else "-"):
entry["collapsed"] = not collapsed
imgui.same_line()
imgui.set_next_item_width(120)
if imgui.begin_combo("##role", entry["role"]):
for r in self.disc_roles:
if imgui.selectable(r, r == entry["role"])[0]:
entry["role"] = r
imgui.end_combo()
if not collapsed:
imgui.same_line()
if imgui.button("[Edit]" if read_mode else "[Read]"):
entry["read_mode"] = not read_mode
ts_str = entry.get("ts", "")
if ts_str:
imgui.same_line()
imgui.text_colored(vec4(120, 120, 100), str(ts_str))
if collapsed:
imgui.same_line()
if imgui.button("Ins"):
self.disc_entries.insert(i, {"role": "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
self._render_text_viewer(f"Entry #{i+1}", entry["content"])
imgui.same_line()
if imgui.button("Del"):
self.disc_entries.pop(i)
imgui.pop_id()
break # Break from inner loop, clipper will re-step
imgui.same_line()
preview = entry["content"].replace("\\n", " ")[:60]
if len(entry["content"]) > 60: preview += "..."
imgui.text_colored(vec4(160, 160, 150), preview)
if not collapsed:
if read_mode:
imgui.begin_child("read_content", imgui.ImVec2(0, 150), True)
if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(entry["content"])
if self.ui_word_wrap: imgui.pop_text_wrap_pos()
imgui.end_child()
else:
ch, entry["content"] = imgui.input_text_multiline("##content", entry["content"], imgui.ImVec2(-1, 150))
imgui.separator()
imgui.pop_id()
if self._scroll_disc_to_bottom:
imgui.set_scroll_here_y(1.0)
self._scroll_disc_to_bottom = False
imgui.end_child()
def _render_provider_panel(self) -> None:
imgui.text("Provider")
if imgui.begin_combo("##prov", self.current_provider):
for p in PROVIDERS:
if imgui.selectable(p, p == self.current_provider)[0]:
self.current_provider = p
imgui.end_combo()
imgui.separator()
imgui.text("Model")
imgui.same_line()
if imgui.button("Fetch Models"):
self._fetch_models(self.current_provider)
if imgui.begin_list_box("##models", imgui.ImVec2(-1, 120)):
for m in self.available_models:
if imgui.selectable(m, m == self.current_model)[0]:
self.current_model = m
imgui.end_list_box()
imgui.separator()
imgui.text("Parameters")
ch, self.temperature = imgui.slider_float("Temperature", self.temperature, 0.0, 2.0, "%.2f")
ch, self.max_tokens = imgui.input_int("Max Tokens (Output)", self.max_tokens, 1024)
ch, self.history_trunc_limit = imgui.input_int("History Truncation Limit", self.history_trunc_limit, 1024)
if self.current_provider == "gemini_cli":
imgui.separator()
imgui.text("Gemini CLI")
sid = "None"
if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter:
sid = ai_client._gemini_cli_adapter.session_id or "None"
imgui.text(f"Session ID: {sid}")
if imgui.button("Reset CLI Session"):
ai_client.reset_session()
imgui.text("Binary Path")
ch, self.ui_gemini_cli_path = imgui.input_text("##gcli_path", self.ui_gemini_cli_path)
imgui.same_line()
if imgui.button("Browse##gcli"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select gemini CLI binary")
r.destroy()
if p:
self.ui_gemini_cli_path = p
if ch:
if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
imgui.separator()
imgui.text("Telemetry")
usage = self.session_usage
total = usage["input_tokens"] + usage["output_tokens"]
if total == 0 and usage.get("total_tokens", 0) > 0:
total = usage["total_tokens"]
imgui.text_colored(C_RES, f"Tokens: {total:,} (In: {usage['input_tokens']:,} Out: {usage['output_tokens']:,})")
if usage.get("last_latency", 0.0) > 0:
imgui.text_colored(C_LBL, f" Last Latency: {usage['last_latency']:.2f}s")
if usage["cache_read_input_tokens"]:
imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}")
imgui.text("Token Budget:")
imgui.progress_bar(self._token_budget_pct, imgui.ImVec2(-1, 0), f"{self._token_budget_current:,} / {self._token_budget_limit:,}")
if self._gemini_cache_text:
imgui.text_colored(C_SUB, self._gemini_cache_text)
def _render_message_panel(self) -> None:
# LIVE indicator
is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
if is_live:
val = math.sin(time.time() * 10 * math.pi)
alpha = 1.0 if val > 0 else 0.0
imgui.text_colored(imgui.ImVec4(0.39, 1.0, 0.39, alpha), "LIVE")
imgui.separator()
ch, self.ui_ai_input = imgui.input_text_multiline("##ai_in", self.ui_ai_input, imgui.ImVec2(-1, -40))
# Keyboard shortcuts
io = imgui.get_io()
ctrl_enter = io.key_ctrl and imgui.is_key_pressed(imgui.Key.enter)
ctrl_l = io.key_ctrl and imgui.is_key_pressed(imgui.Key.l)
if ctrl_l:
self.ui_ai_input = ""
imgui.separator()
send_busy = False
with self._send_thread_lock:
if self.send_thread and self.send_thread.is_alive():
send_busy = True
if (imgui.button("Gen + Send") or ctrl_enter) and not send_busy:
self._handle_generate_send()
imgui.same_line()
if imgui.button("MD Only"):
self._handle_md_only()
imgui.same_line()
if imgui.button("Reset"):
self._handle_reset_session()
imgui.same_line()
if imgui.button("-> History"):
if self.ui_ai_input:
self.disc_entries.append({"role": "User", "content": self.ui_ai_input, "collapsed": False, "ts": project_manager.now_ts()})
def _render_response_panel(self) -> None:
if self._trigger_blink:
self._trigger_blink = False
self._is_blinking = True
self._blink_start_time = time.time()
try:
imgui.set_window_focus("Response")
except:
pass
is_blinking = False
if self._is_blinking:
elapsed = time.time() - self._blink_start_time
if elapsed > 1.5:
self._is_blinking = False
else:
is_blinking = True
val = math.sin(elapsed * 8 * math.pi)
alpha = 50/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 255, 0, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, alpha))
# --- Always Render Content ---
if self.ui_word_wrap:
imgui.begin_child("resp_wrap", imgui.ImVec2(-1, -40), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ai_response)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##ai_out", self.ai_response, imgui.ImVec2(-1, -40), imgui.InputTextFlags_.read_only)
imgui.separator()
if imgui.button("-> History"):
if self.ai_response:
self.disc_entries.append({"role": "AI", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
if is_blinking:
imgui.pop_style_color(2)
def _cb_ticket_retry(self, ticket_id: str) -> None:
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'todo'
break
asyncio.run_coroutine_threadsafe(
self.event_queue.put("mma_retry", {"ticket_id": ticket_id}),
self._loop
)
def _cb_ticket_skip(self, ticket_id: str) -> None:
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'skipped'
break
asyncio.run_coroutine_threadsafe(
self.event_queue.put("mma_skip", {"ticket_id": ticket_id}),
self._loop
)
def _cb_run_conductor_setup(self) -> None:
base = Path("conductor")
if not base.exists():
self.ui_conductor_setup_summary = "Error: conductor/ directory not found."
return
files = list(base.glob("**/*"))
files = [f for f in files if f.is_file()]
summary = [f"Conductor Directory: {base.absolute()}"]
summary.append(f"Total Files: {len(files)}")
total_lines = 0
for f in files:
try:
with open(f, "r", encoding="utf-8") as fd:
lines = len(fd.readlines())
total_lines += lines
summary.append(f"- {f.relative_to(base)}: {lines} lines")
except Exception:
summary.append(f"- {f.relative_to(base)}: Error reading")
summary.append(f"Total Line Count: {total_lines}")
tracks_dir = base / "tracks"
if tracks_dir.exists():
tracks = [d for d in tracks_dir.iterdir() if d.is_dir()]
summary.append(f"Total Tracks Found: {len(tracks)}")
else:
summary.append("Tracks Directory: Not found")
self.ui_conductor_setup_summary = "\n".join(summary)
def _cb_create_track(self, name: str, desc: str, track_type: str) -> None:
if not name: return
from datetime import datetime
date_suffix = datetime.now().strftime("%Y%m%d")
track_id = f"{name.lower().replace(' ', '_')}_{date_suffix}"
track_dir = Path("conductor/tracks") / track_id
track_dir.mkdir(parents=True, exist_ok=True)
spec_file = track_dir / "spec.md"
with open(spec_file, "w", encoding="utf-8") as f:
f.write(f"# Specification: {name}\n\nType: {track_type}\n\nDescription: {desc}\n")
plan_file = track_dir / "plan.md"
with open(plan_file, "w", encoding="utf-8") as f:
f.write(f"# Implementation Plan: {name}\n\n- [ ] Task 1: Initialize\n")
meta_file = track_dir / "metadata.json"
import json
with open(meta_file, "w", encoding="utf-8") as f:
json.dump({
"id": track_id,
"title": name,
"description": desc,
"type": track_type,
"status": "new",
"progress": 0.0
}, f, indent=1)
# Refresh tracks from disk
self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir)
def _push_mma_state_update(self) -> None:
if not self.active_track:
return
# Sync active_tickets (list of dicts) back to active_track.tickets (list of Ticket objects)
self.active_track.tickets = [Ticket.from_dict(t) for t in self.active_tickets]
# Save the state to disk
from project_manager import save_track_state, load_track_state
from models import TrackState, Metadata
from datetime import datetime
existing = load_track_state(self.active_track.id, self.ui_files_base_dir)
meta = Metadata(
id=self.active_track.id,
name=self.active_track.description,
status=self.mma_status,
created_at=existing.metadata.created_at if existing else datetime.now(),
updated_at=datetime.now()
)
state = TrackState(
metadata=meta,
discussion=existing.discussion if existing else [],
tasks=self.active_track.tickets
)
save_track_state(self.active_track.id, state, self.ui_files_base_dir)
def _render_tool_calls_panel(self) -> None:
imgui.text("Tool call history")
imgui.same_line()
if imgui.button("Clear##tc"):
self._tool_log.clear()
imgui.separator()
imgui.begin_child("scroll_area")
clipper = imgui.ListClipper()
clipper.begin(len(self._tool_log))
while clipper.step():
for i_minus_one in range(clipper.display_start, clipper.display_end):
i = i_minus_one + 1
script, result, _ = self._tool_log[i_minus_one]
first_line = script.strip().splitlines()[0][:80] if script.strip() else "(empty)"
imgui.text_colored(C_KEY, f"Call #{i}: {first_line}")
# Script Display
imgui.text_colored(C_LBL, "Script:")
imgui.same_line()
if imgui.button(f"[+]##script_{i}"):
self.show_text_viewer = True
self.text_viewer_title = f"Call Script #{i}"
self.text_viewer_content = script
if self.ui_word_wrap:
imgui.begin_child(f"tc_script_wrap_{i}", imgui.ImVec2(-1, 72), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(script)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.begin_child(f"tc_script_fixed_width_{i}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar)
imgui.input_text_multiline(f"##tc_script_res_{i}", script, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end_child()
# Result Display
imgui.text_colored(C_LBL, "Output:")
imgui.same_line()
if imgui.button(f"[+]##output_{i}"):
self.show_text_viewer = True
self.text_viewer_title = f"Call Output #{i}"
self.text_viewer_content = result
if self.ui_word_wrap:
imgui.begin_child(f"tc_res_wrap_{i}", imgui.ImVec2(-1, 72), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(result)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.begin_child(f"tc_res_fixed_width_{i}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar)
imgui.input_text_multiline(f"##tc_res_val_{i}", result, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end_child()
imgui.separator()
imgui.end_child()
def _render_comms_history_panel(self) -> None:
imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}")
imgui.same_line()
if imgui.button("Clear##comms"):
ai_client.clear_comms_log()
self._comms_log.clear()
imgui.same_line()
if imgui.button("Load Log"):
self._cb_load_prior_log()
if self.is_viewing_prior_session:
imgui.same_line()
if imgui.button("Exit Prior Session"):
self.is_viewing_prior_session = False
self.prior_session_entries.clear()
self.ai_status = "idle"
imgui.separator()
imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION")
imgui.separator()
imgui.begin_child("scroll_area")
clipper = imgui.ListClipper()
clipper.begin(len(self._comms_log))
while clipper.step():
for i in range(clipper.display_start, clipper.display_end):
entry = self._comms_log[i]
imgui.text_colored(C_KEY, f"[{entry.get('direction')}] {entry.get('type')}")
imgui.same_line()
if imgui.button(f"[+]##c{i}"):
self.show_text_viewer = True
self.text_viewer_title = f"Comms Entry #{i}"
self.text_viewer_content = json.dumps(entry.get("payload"), indent=2)
imgui.text_unformatted(str(entry.get("payload"))[:200] + "...")
imgui.separator()
imgui.end_child()
def _render_mma_dashboard(self) -> None:
# Task 5.3: Dense Summary Line
track_name = self.active_track.description if self.active_track else "None"
total_tickets = len(self.active_tickets)
done_tickets = sum(1 for t in self.active_tickets if t.get('status') == 'complete')
total_cost = 0.0
for stats in self.mma_tier_usage.values():
model = stats.get('model', 'unknown')
in_t = stats.get('input', 0)
out_t = stats.get('output', 0)
total_cost += cost_tracker.estimate_cost(model, in_t, out_t)
imgui.text("Track:")
imgui.same_line()
imgui.text_colored(C_VAL, track_name)
imgui.same_line()
imgui.text(" | Tickets:")
imgui.same_line()
imgui.text_colored(C_VAL, f"{done_tickets}/{total_tickets}")
imgui.same_line()
imgui.text(" | Cost:")
imgui.same_line()
imgui.text_colored(imgui.ImVec4(0, 1, 0, 1), f"${total_cost:,.4f}")
imgui.same_line()
imgui.text(" | Status:")
imgui.same_line()
status_col = imgui.ImVec4(1, 1, 1, 1)
if self.mma_status == "idle": status_col = imgui.ImVec4(0.7, 0.7, 0.7, 1)
elif self.mma_status == "running": status_col = imgui.ImVec4(1, 1, 0, 1)
elif self.mma_status == "done": status_col = imgui.ImVec4(0, 1, 0, 1)
elif self.mma_status == "error": status_col = imgui.ImVec4(1, 0, 0, 1)
imgui.text_colored(status_col, self.mma_status.upper())
imgui.separator()
# 0. Conductor Setup
if imgui.collapsing_header("Conductor Setup"):
if imgui.button("Run Setup Scan"):
self._cb_run_conductor_setup()
if self.ui_conductor_setup_summary:
imgui.input_text_multiline("##setup_summary", self.ui_conductor_setup_summary, imgui.ImVec2(-1, 120), imgui.InputTextFlags_.read_only)
imgui.separator()
# 1. Track Browser
imgui.text("Track Browser")
if imgui.begin_table("mma_tracks_table", 4, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable):
imgui.table_setup_column("Title")
imgui.table_setup_column("Status")
imgui.table_setup_column("Progress")
imgui.table_setup_column("Actions")
imgui.table_headers_row()
for track in self.tracks:
imgui.table_next_row()
imgui.table_next_column()
imgui.text(track.get("title", "Untitled"))
imgui.table_next_column()
status = track.get("status", "unknown").lower()
if status == "new":
imgui.text_colored(imgui.ImVec4(0.7, 0.7, 0.7, 1.0), "NEW")
elif status == "active":
imgui.text_colored(imgui.ImVec4(1.0, 1.0, 0.0, 1.0), "ACTIVE")
elif status == "done":
imgui.text_colored(imgui.ImVec4(0.0, 1.0, 0.0, 1.0), "DONE")
elif status == "blocked":
imgui.text_colored(imgui.ImVec4(1.0, 0.0, 0.0, 1.0), "BLOCKED")
else:
imgui.text(status)
imgui.table_next_column()
progress = track.get("progress", 0.0)
if progress < 0.33:
p_color = imgui.ImVec4(1.0, 0.0, 0.0, 1.0)
elif progress < 0.66:
p_color = imgui.ImVec4(1.0, 1.0, 0.0, 1.0)
else:
p_color = imgui.ImVec4(0.0, 1.0, 0.0, 1.0)
imgui.push_style_color(imgui.Col_.plot_histogram, p_color)
imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{int(progress*100)}%")
imgui.pop_style_color()
imgui.table_next_column()
if imgui.button(f"Load##{track.get('id')}"):
self._cb_load_track(track.get("id"))
imgui.end_table()
# 1b. New Track Form
imgui.text("Create New Track")
changed_n, self.ui_new_track_name = imgui.input_text("Name##new_track", self.ui_new_track_name)
changed_d, self.ui_new_track_desc = imgui.input_text_multiline("Description##new_track", self.ui_new_track_desc, imgui.ImVec2(-1, 60))
imgui.text("Type:")
imgui.same_line()
if imgui.begin_combo("##track_type", self.ui_new_track_type):
for ttype in ["feature", "chore", "fix"]:
if imgui.selectable(ttype, self.ui_new_track_type == ttype)[0]:
self.ui_new_track_type = ttype
imgui.end_combo()
if imgui.button("Create Track"):
self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type)
self.ui_new_track_name = ""
self.ui_new_track_desc = ""
imgui.separator()
# 2. Global Controls
changed, self.mma_step_mode = imgui.checkbox("Step Mode (HITL)", self.mma_step_mode)
if changed:
# We could push an event here if the engine needs to know immediately
pass
imgui.same_line()
imgui.text(f"Status: {self.mma_status.upper()}")
if self.active_tier:
imgui.same_line()
imgui.text_colored(C_VAL, f"| Active: {self.active_tier}")
# Approval pending indicator
any_pending = (
self._pending_mma_spawn is not None or
self._pending_mma_approval is not None or
self._pending_ask_dialog
)
if any_pending:
alpha = abs(math.sin(time.time() * 5))
imgui.same_line()
imgui.text_colored(imgui.ImVec4(1.0, 0.3, 0.3, alpha), " APPROVAL PENDING")
imgui.same_line()
if imgui.button("Go to Approval"):
pass # scroll/focus handled by existing dialog rendering
imgui.separator()
# 2. Active Track Info
if self.active_track:
imgui.text(f"Track: {self.active_track.description}")
# Progress bar
tickets = self.active_tickets
total = len(tickets)
if total > 0:
complete = sum(1 for t in tickets if t.get('status') == 'complete')
progress = complete / total
imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{complete}/{total} Tickets")
else:
imgui.text_disabled("No active MMA track.")
# 3. Token Usage Table
imgui.separator()
imgui.text("Tier Usage (Tokens & Cost)")
if imgui.begin_table("mma_usage", 5, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg):
imgui.table_setup_column("Tier")
imgui.table_setup_column("Model")
imgui.table_setup_column("Input")
imgui.table_setup_column("Output")
imgui.table_setup_column("Est. Cost")
imgui.table_headers_row()
usage = self.mma_tier_usage
total_cost = 0.0
for tier, stats in usage.items():
imgui.table_next_row()
imgui.table_next_column()
imgui.text(tier)
imgui.table_next_column()
model = stats.get('model', 'unknown')
imgui.text(model)
imgui.table_next_column()
in_t = stats.get('input', 0)
imgui.text(f"{in_t:,}")
imgui.table_next_column()
out_t = stats.get('output', 0)
imgui.text(f"{out_t:,}")
imgui.table_next_column()
cost = cost_tracker.estimate_cost(model, in_t, out_t)
total_cost += cost
imgui.text(f"${cost:,.4f}")
# Total Row
imgui.table_next_row()
imgui.table_set_bg_color(imgui.TableBgTarget_.row_bg0, imgui.get_color_u32(imgui.Col_.plot_lines_hovered))
imgui.table_next_column()
imgui.text("TOTAL")
imgui.table_next_column()
imgui.text("")
imgui.table_next_column()
imgui.text("")
imgui.table_next_column()
imgui.text("")
imgui.table_next_column()
imgui.text(f"${total_cost:,.4f}")
imgui.end_table()
imgui.separator()
# 3b. Tier Model Config
if imgui.collapsing_header("Tier Model Config"):
for tier in self.mma_tier_usage.keys():
imgui.text(f"{tier}:")
imgui.same_line()
current_model = self.mma_tier_usage[tier].get("model", "unknown")
if imgui.begin_combo(f"##combo_{tier}", current_model):
for model in self.available_models:
if imgui.selectable(model, current_model == model)[0]:
self.mma_tier_usage[tier]["model"] = model
self.project.setdefault("mma", {}).setdefault("tier_models", {})[tier] = model
imgui.end_combo()
imgui.separator()
# 4. Task DAG Visualizer
imgui.text("Task DAG")
if self.active_track:
tickets_by_id = {t.get('id'): t for t in self.active_tickets}
all_ids = set(tickets_by_id.keys())
# Build children map
children_map = {}
for t in self.active_tickets:
for dep in t.get('depends_on', []):
if dep not in children_map: children_map[dep] = []
children_map[dep].append(t.get('id'))
# Roots are those whose depends_on elements are NOT in all_ids
roots = []
for t in self.active_tickets:
deps = t.get('depends_on', [])
has_local_dep = any(d in all_ids for d in deps)
if not has_local_dep:
roots.append(t)
rendered = set()
for root in roots:
self._render_ticket_dag_node(root, tickets_by_id, children_map, rendered)
# 5. Add Ticket Form
imgui.separator()
if imgui.button("Add Ticket"):
self._show_add_ticket_form = not self._show_add_ticket_form
if self._show_add_ticket_form:
# Default Ticket ID
max_id = 0
for t in self.active_tickets:
tid = t.get('id', '')
if tid.startswith('T-'):
try: max_id = max(max_id, int(tid[2:]))
except: pass
self.ui_new_ticket_id = f"T-{max_id + 1:03d}"
self.ui_new_ticket_desc = ""
self.ui_new_ticket_target = ""
self.ui_new_ticket_deps = ""
if self._show_add_ticket_form:
imgui.begin_child("add_ticket_form", imgui.ImVec2(-1, 220), True)
imgui.text_colored(C_VAL, "New Ticket Details")
_, self.ui_new_ticket_id = imgui.input_text("ID##new_ticket", self.ui_new_ticket_id)
_, self.ui_new_ticket_desc = imgui.input_text_multiline("Description##new_ticket", self.ui_new_ticket_desc, imgui.ImVec2(-1, 60))
_, self.ui_new_ticket_target = imgui.input_text("Target File##new_ticket", self.ui_new_ticket_target)
_, self.ui_new_ticket_deps = imgui.input_text("Depends On (IDs, comma-separated)##new_ticket", self.ui_new_ticket_deps)
if imgui.button("Create"):
new_ticket = {
"id": self.ui_new_ticket_id,
"description": self.ui_new_ticket_desc,
"status": "todo",
"assigned_to": "tier3-worker",
"target_file": self.ui_new_ticket_target,
"depends_on": [d.strip() for d in self.ui_new_ticket_deps.split(",") if d.strip()]
}
self.active_tickets.append(new_ticket)
self._show_add_ticket_form = False
self._push_mma_state_update()
imgui.same_line()
if imgui.button("Cancel"):
self._show_add_ticket_form = False
imgui.end_child()
else:
imgui.text_disabled("No active MMA track.")
def _render_tier_stream_panel(self, tier_key: str, stream_key: str | None) -> None:
if stream_key is not None:
content = self.mma_streams.get(stream_key, "")
imgui.begin_child(f"##stream_content_{tier_key}", imgui.ImVec2(-1, -1))
imgui.text_wrapped(content)
try:
if len(content) != self._tier_stream_last_len.get(stream_key, -1):
imgui.set_scroll_here_y(1.0)
self._tier_stream_last_len[stream_key] = len(content)
except (TypeError, AttributeError):
pass
imgui.end_child()
else:
tier3_keys = [k for k in self.mma_streams if "Tier 3" in k]
if not tier3_keys:
imgui.text_disabled("No worker output yet.")
else:
for key in tier3_keys:
ticket_id = key.split(": ", 1)[-1] if ": " in key else key
imgui.text(ticket_id)
imgui.begin_child(f"##tier3_{ticket_id}_scroll", imgui.ImVec2(-1, 150), True)
imgui.text_wrapped(self.mma_streams[key])
try:
if len(self.mma_streams[key]) != self._tier_stream_last_len.get(key, -1):
imgui.set_scroll_here_y(1.0)
self._tier_stream_last_len[key] = len(self.mma_streams[key])
except (TypeError, AttributeError):
pass
imgui.end_child()
def _render_ticket_dag_node(self, ticket: Ticket, tickets_by_id: dict[str, Ticket], children_map: dict[str, list[str]], rendered: set[str]) -> None:
tid = ticket.get('id', '??')
is_duplicate = tid in rendered
if not is_duplicate:
rendered.add(tid)
target = ticket.get('target_file', 'general')
status = ticket.get('status', 'pending').upper()
status_color = vec4(178, 178, 178)
if status == 'RUNNING':
status_color = vec4(255, 255, 0)
elif status == 'COMPLETE':
status_color = vec4(0, 255, 0)
elif status in ['BLOCKED', 'ERROR']:
status_color = vec4(255, 0, 0)
elif status == 'PAUSED':
status_color = vec4(255, 165, 0)
p_min = imgui.get_cursor_screen_pos()
p_max = imgui.ImVec2(p_min.x + 4, p_min.y + imgui.get_text_line_height())
imgui.get_window_draw_list().add_rect_filled(p_min, p_max, imgui.get_color_u32(status_color))
imgui.set_cursor_screen_pos(imgui.ImVec2(p_min.x + 8, p_min.y))
flags = imgui.TreeNodeFlags_.open_on_arrow | imgui.TreeNodeFlags_.open_on_double_click | imgui.TreeNodeFlags_.default_open
children = children_map.get(tid, [])
if not children or is_duplicate:
flags |= imgui.TreeNodeFlags_.leaf
node_open = imgui.tree_node_ex(f"##{tid}", flags)
if imgui.is_item_hovered():
imgui.begin_tooltip()
imgui.text_colored(C_KEY, f"ID: {tid}")
imgui.text_colored(C_LBL, f"Target: {target}")
imgui.text_colored(C_LBL, f"Description:")
imgui.same_line()
imgui.text_wrapped(ticket.get('description', 'N/A'))
deps = ticket.get('depends_on', [])
if deps:
imgui.text_colored(C_LBL, f"Depends on: {', '.join(deps)}")
stream_key = f"Tier 3: {tid}"
if stream_key in self.mma_streams:
imgui.separator()
imgui.text_colored(C_KEY, "Worker Stream:")
imgui.text_wrapped(self.mma_streams[stream_key])
imgui.end_tooltip()
imgui.same_line()
imgui.text_colored(C_KEY, tid)
imgui.same_line(150)
imgui.text_disabled(str(target))
imgui.same_line(400)
imgui.text_colored(status_color, status)
imgui.same_line(500)
if imgui.button(f"Retry##{tid}"):
self._cb_ticket_retry(tid)
imgui.same_line()
if imgui.button(f"Skip##{tid}"):
self._cb_ticket_skip(tid)
if status in ['TODO', 'BLOCKED']:
imgui.same_line()
if imgui.button(f"Delete##{tid}"):
self.active_tickets = [t for t in self.active_tickets if t.get('id') != tid]
for t in self.active_tickets:
deps = t.get('depends_on', [])
if tid in deps:
t['depends_on'] = [d for d in deps if d != tid]
self._push_mma_state_update()
if is_duplicate:
imgui.same_line()
imgui.text_disabled("(shown above)")
if node_open and not is_duplicate:
for child_id in children:
child = tickets_by_id.get(child_id)
if child:
self._render_ticket_dag_node(child, tickets_by_id, children_map, rendered)
imgui.tree_pop()
def _render_comms_history_panel(self) -> None:
imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}")
imgui.same_line()
if imgui.button("Clear##comms"):
ai_client.clear_comms_log()
self._comms_log.clear()
imgui.same_line()
if imgui.button("Load Log"):
self.cb_load_prior_log()
if self.is_viewing_prior_session:
imgui.same_line()
if imgui.button("Exit Prior Session"):
self.is_viewing_prior_session = False
self.prior_session_entries.clear()
self.ai_status = "idle"
imgui.separator()
imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION")
imgui.separator()
imgui.text_colored(C_OUT, "OUT")
imgui.same_line()
imgui.text_colored(C_REQ, "request")
imgui.same_line()
imgui.text_colored(C_TC, "tool_call")
imgui.same_line()
imgui.text(" ")
imgui.same_line()
imgui.text_colored(C_IN, "IN")
imgui.same_line()
imgui.text_colored(C_RES, "response")
imgui.same_line()
imgui.text_colored(C_TR, "tool_result")
imgui.separator()
# Use tinted background for prior session
if self.is_viewing_prior_session:
imgui.push_style_color(imgui.Col_.child_bg, vec4(40, 30, 20))
imgui.begin_child("comms_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar)
log_to_render = self.prior_session_entries if self.is_viewing_prior_session else list(self._comms_log)
for idx_minus_one, entry in enumerate(log_to_render):
idx = idx_minus_one + 1
local_ts = entry.get("local_ts", 0)
# Blink effect
blink_alpha = 0.0
if local_ts > 0 and not self.is_viewing_prior_session:
elapsed = time.time() - local_ts
if elapsed < 3.0:
blink_alpha = (1.0 - (elapsed / 3.0)) * 0.3 * (math.sin(elapsed * 10) * 0.5 + 0.5)
imgui.push_id(f"comms_{idx}")
if blink_alpha > 0:
# Draw a background highlight for the entry
draw_list = imgui.get_window_draw_list()
p_min = imgui.get_cursor_screen_pos()
# Estimate height or just use a fixed height for the background
# It's better to wrap the entry in a group or just use separators
# For now, let's just use the style color push if we are sure we pop it
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, blink_alpha))
# We still need a child or a group to apply the background to
imgui.begin_group()
d = entry.get("direction", "IN")
k = entry.get("kind", "response")
imgui.text_colored(vec4(160, 160, 160), f"#{idx}")
imgui.same_line()
imgui.text_colored(vec4(160, 160, 160), entry.get("ts", "00:00:00"))
imgui.same_line()
imgui.text_colored(DIR_COLORS.get(d, C_VAL), d)
imgui.same_line()
imgui.text_colored(KIND_COLORS.get(k, C_VAL), k)
imgui.same_line()
imgui.text_colored(C_LBL, f"{entry.get('provider', '?')}/{entry.get('model', '?')}")
payload = entry.get("payload", {})
if k == "request":
self._render_heavy_text("message", payload.get("message", ""))
elif k == "response":
imgui.text_colored(C_LBL, "round:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload.get("round", "")))
imgui.text_colored(C_LBL, "stop_reason:")
imgui.same_line()
imgui.text_colored(vec4(255, 200, 120), str(payload.get("stop_reason", "")))
text = payload.get("text", "")
if text: self._render_heavy_text("text", text)
imgui.text_colored(C_LBL, "tool_calls:")
tcs = payload.get("tool_calls", [])
if not tcs: imgui.text_colored(C_VAL, " (none)")
for tc_i, tc in enumerate(tcs):
imgui.text_colored(C_KEY, f" call[{tc_i}] {tc.get('name', '?')}")
if tc.get("id"):
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(tc["id"]))
if "args" in tc or "input" in tc:
self._render_heavy_text(f"call_{tc_i}_args", str(tc.get("args") or tc.get("input")))
elif k == "tool_call":
imgui.text_colored(C_KEY, payload.get("name", "?"))
if payload.get("id"):
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
if "script" in payload: self._render_heavy_text("script", payload["script"])
if "args" in payload: self._render_heavy_text("args", str(payload["args"]))
elif k == "tool_result":
imgui.text_colored(C_KEY, payload.get("name", "?"))
if payload.get("id"):
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
if "output" in payload: self._render_heavy_text("output", payload["output"])
if "results" in payload:
for r_i, r in enumerate(payload["results"]):
imgui.text_colored(C_LBL, f" Result[{r_i}]:")
self._render_heavy_text(f"res_{r_i}", str(r))
if "usage" in payload:
u = payload["usage"]
u_str = f"In: {u.get('input_tokens', 0)} Out: {u.get('output_tokens', 0)}"
if u.get("cache_read_input_tokens"): u_str += f" (Cache: {u['cache_read_input_tokens']})"
imgui.text_colored(C_SUB, f" Usage: {u_str}")
imgui.separator()
if blink_alpha > 0:
imgui.end_group()
imgui.pop_style_color()
imgui.pop_id()
if self._scroll_comms_to_bottom:
imgui.set_scroll_here_y(1.0)
self._scroll_comms_to_bottom = False
imgui.end_child()
if self.is_viewing_prior_session:
imgui.pop_style_color()
def _render_system_prompts_panel(self) -> None:
imgui.text("Global System Prompt (all projects)")
ch, self.ui_global_system_prompt = imgui.input_text_multiline("##gsp", self.ui_global_system_prompt, imgui.ImVec2(-1, 100))
imgui.separator()
imgui.text("Project System Prompt")
ch, self.ui_project_system_prompt = imgui.input_text_multiline("##psp", self.ui_project_system_prompt, imgui.ImVec2(-1, 100))
def _render_theme_panel(self) -> None:
exp, self.show_windows["Theme"] = imgui.begin("Theme", self.show_windows["Theme"])
if exp:
imgui.text("Palette")
cp = theme.get_current_palette()
if imgui.begin_combo("##pal", cp):
for p in theme.get_palette_names():
if imgui.selectable(p, p == cp)[0]:
theme.apply(p)
imgui.end_combo()
imgui.separator()
imgui.text("Font")
imgui.push_item_width(-150)
ch, path = imgui.input_text("##fontp", theme.get_current_font_path())
imgui.pop_item_width()
if ch: theme._current_font_path = path
imgui.same_line()
if imgui.button("Browse##font"):
r = hide_tk_root()
p = filedialog.askopenfilename(filetypes=[("Fonts", "*.ttf *.otf"), ("All", "*.*")])
r.destroy()
if p: theme._current_font_path = p
imgui.text("Size (px)")
imgui.same_line()
imgui.push_item_width(100)
ch, size = imgui.input_float("##fonts", theme.get_current_font_size(), 1.0, 1.0, "%.0f")
if ch: theme._current_font_size = size
imgui.pop_item_width()
imgui.same_line()
if imgui.button("Apply Font (Requires Restart)"):
self._flush_to_config()
save_config(self.config)
self.ai_status = "Font settings saved. Restart required."
imgui.separator()
imgui.text("UI Scale (DPI)")
ch, scale = imgui.slider_float("##scale", theme.get_current_scale(), 0.5, 3.0, "%.2f")
if ch: theme.set_scale(scale)
imgui.end()
def _load_fonts(self) -> None:
font_path, font_size = theme.get_font_loading_params()
if font_path and Path(font_path).exists():
hello_imgui.load_font(font_path, font_size)
def _post_init(self) -> None:
theme.apply_current()
def run(self) -> None:
"""Initializes the ImGui runner and starts the main application loop."""
if "--headless" in sys.argv:
print("Headless mode active")
self._fetch_models(self.current_provider)
import uvicorn
headless_cfg = self.config.get("headless", {})
port = headless_cfg.get("port", 8000)
api = self.create_api()
uvicorn.run(api, host="0.0.0.0", port=port)
else:
theme.load_from_config(self.config)
self.runner_params = hello_imgui.RunnerParams()
self.runner_params.app_window_params.window_title = "manual slop"
self.runner_params.app_window_params.window_geometry.size = (1680, 1200)
self.runner_params.imgui_window_params.enable_viewports = False
self.runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space
self.runner_params.fps_idling.enable_idling = False
self.runner_params.imgui_window_params.show_menu_bar = True
self.runner_params.ini_folder_type = hello_imgui.IniFolderType.current_folder
self.runner_params.ini_filename = "manualslop_layout.ini"
self.runner_params.callbacks.show_gui = self._gui_func
self.runner_params.callbacks.show_menus = self._show_menus
self.runner_params.callbacks.load_additional_fonts = self._load_fonts
self.runner_params.callbacks.post_init = self._post_init
self._fetch_models(self.current_provider)
# Start API hooks server (if enabled)
self.hook_server = api_hooks.HookServer(self)
self.hook_server.start()
immapp.run(self.runner_params)
# On exit
self.hook_server.stop()
self.perf_monitor.stop()
ai_client.cleanup() # Destroy active API caches to stop billing
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
session_logger.close_session()
def main() -> None:
app = App()
app.run()
if __name__ == "__main__":
main()