Files
manual_slop/gui_2.py

3337 lines
150 KiB
Python

# gui_2.py
import tomli_w
import threading
import asyncio
import time
import math
import json
import sys
import os
import uuid
import requests
from pathlib import Path
from tkinter import filedialog, Tk
from typing import Optional, Callable
import aggregate
import ai_client
from ai_client import ProviderError
import shell_runner
import session_logger
import project_manager
import theme_2 as theme
import tomllib
import events
import numpy as np
import api_hooks
import mcp_client
import orchestrator_pm
from performance_monitor import PerformanceMonitor
from log_registry import LogRegistry
from log_pruner import LogPruner
import conductor_tech_lead
import multi_agent_conductor
from models import Track, Ticket
from file_cache import ASTParser
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel
from imgui_bundle import imgui, hello_imgui, immapp
CONFIG_PATH = Path("config.toml")
PROVIDERS = ["gemini", "anthropic", "gemini_cli", "deepseek"]
COMMS_CLAMP_CHARS = 300
def load_config() -> dict:
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
def save_config(config: dict):
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
def hide_tk_root() -> Tk:
root = Tk()
root.withdraw()
root.wm_attributes("-topmost", True)
return root
# Color Helpers
def vec4(r, g, b, a=1.0): return imgui.ImVec4(r/255, g/255, b/255, a)
C_OUT = vec4(100, 200, 255)
C_IN = vec4(140, 255, 160)
C_REQ = vec4(255, 220, 100)
C_RES = vec4(180, 255, 180)
C_TC = vec4(255, 180, 80)
C_TR = vec4(180, 220, 255)
C_TRS = vec4(200, 180, 255)
C_LBL = vec4(180, 180, 180)
C_VAL = vec4(220, 220, 220)
C_KEY = vec4(140, 200, 255)
C_NUM = vec4(180, 255, 180)
C_SUB = vec4(220, 200, 120)
DIR_COLORS = {"OUT": C_OUT, "IN": C_IN}
KIND_COLORS = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS}
HEAVY_KEYS = {"message", "text", "script", "output", "content"}
DISC_ROLES = ["User", "AI", "Vendor API", "System"]
AGENT_TOOL_NAMES = ["run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"]
def truncate_entries(entries: list[dict], max_pairs: int) -> list[dict]:
if max_pairs <= 0:
return []
target_count = max_pairs * 2
if len(entries) <= target_count:
return entries
return entries[-target_count:]
def _parse_history_entries(history: list[str], roles: list[str] | None = None) -> list[dict]:
known = roles if roles is not None else DISC_ROLES
entries = []
for raw in history:
entries.append(project_manager.str_to_entry(raw, known))
return entries
class ConfirmDialog:
def __init__(self, script: str, base_dir: str):
self._uid = str(uuid.uuid4())
self._script = str(script) if script is not None else ""
self._base_dir = str(base_dir) if base_dir is not None else ""
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return self._approved, self._script
class MMAApprovalDialog:
def __init__(self, ticket_id: str, payload: str):
self._ticket_id = ticket_id
self._payload = payload
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return self._approved, self._payload
class MMASpawnApprovalDialog:
def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str):
self._ticket_id = ticket_id
self._role = role
self._prompt = prompt
self._context_md = context_md
self._condition = threading.Condition()
self._done = False
self._approved = False
self._abort = False
def wait(self) -> dict:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return {
'approved': self._approved,
'abort': self._abort,
'prompt': self._prompt,
'context_md': self._context_md
}
class App:
"""The main ImGui interface orchestrator for Manual Slop."""
def __init__(self):
self.config = load_config()
self.event_queue = events.AsyncEventQueue()
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
ai_cfg = self.config.get("ai", {})
self._current_provider: str = ai_cfg.get("provider", "gemini")
self._current_model: str = ai_cfg.get("model", "gemini-2.5-flash-lite")
self.available_models: list[str] = []
self.temperature: float = ai_cfg.get("temperature", 0.0)
self.max_tokens: int = ai_cfg.get("max_tokens", 8192)
self.history_trunc_limit: int = ai_cfg.get("history_trunc_limit", 8000)
projects_cfg = self.config.get("projects", {})
self.project_paths: list[str] = list(projects_cfg.get("paths", []))
self.active_project_path: str = projects_cfg.get("active", "")
self.project: dict = {}
self.active_discussion: str = "main"
self._load_active_project()
# Project-derived state
self.files: list[str] = list(self.project.get("files", {}).get("paths", []))
self.screenshots: list[str] = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles: list[str] = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
self.disc_entries: list[dict] = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
# UI State Variables
self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".")
proj_meta = self.project.get("project", {})
self.ui_project_git_dir = proj_meta.get("git_dir", "")
self.ui_project_main_context = proj_meta.get("main_context", "")
self.ui_project_system_prompt = proj_meta.get("system_prompt", "")
self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini")
self.ui_word_wrap = proj_meta.get("word_wrap", True)
self.ui_summary_only = proj_meta.get("summary_only", False)
self.ui_auto_add_history = disc_sec.get("auto_add", False)
self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "")
self.ui_ai_input = ""
self.ui_disc_new_name_input = ""
self.ui_disc_new_role_input = ""
self.ui_epic_input = ""
self.proposed_tracks = []
self._show_track_proposal_modal = False
# Last Script popup variables
self.ui_last_script_text = ""
self.ui_last_script_output = ""
self.ai_status = "idle"
self.ai_response = ""
self.last_md = ""
self.last_md_path: Path | None = None
self.last_file_items: list = []
self.send_thread: threading.Thread | None = None
self._send_thread_lock = threading.Lock()
self.models_thread: threading.Thread | None = None
_default_windows = {
"Context Hub": True,
"Files & Media": True,
"AI Settings": True,
"MMA Dashboard": True,
"Discussion Hub": True,
"Operations Hub": True,
"Theme": True,
"Log Management": False,
"Diagnostics": False,
}
saved = self.config.get("gui", {}).get("show_windows", {})
self.show_windows = {k: saved.get(k, v) for k, v in _default_windows.items()}
self.show_script_output = False
self.show_text_viewer = False
self.text_viewer_title = ""
self.text_viewer_content = ""
self._pending_dialog: ConfirmDialog | None = None
self._pending_dialog_open = False
self._pending_dialog_lock = threading.Lock()
self._pending_actions: dict[str, ConfirmDialog] = {}
# Ask-related state (for tool approvals from CLI)
self._pending_ask_dialog = False
self._ask_dialog_open = False
self._ask_request_id = None
self._ask_tool_data = None
# MMA State
self.mma_step_mode = False
self.active_track = None
self.active_tickets = []
self.active_tier = None # "Tier 1", "Tier 2", etc.
self.mma_status = "idle"
# MMA-specific approval state
self._pending_mma_approval = None
self._mma_approval_open = False
self._mma_approval_edit_mode = False
self._mma_approval_payload = ""
# MMA Spawn approval state
self._pending_mma_spawn = None
self._mma_spawn_open = False
self._mma_spawn_edit_mode = False
self._mma_spawn_prompt = ''
self._mma_spawn_context = ''
# Orchestration State
self.ui_epic_input = ""
self.proposed_tracks: list[dict] = []
self._show_track_proposal_modal = False
self.mma_tier_usage = {
"Tier 1": {"input": 0, "output": 0},
"Tier 2": {"input": 0, "output": 0},
"Tier 3": {"input": 0, "output": 0},
"Tier 4": {"input": 0, "output": 0},
}
self._tool_log: list[tuple[str, str]] = []
self._comms_log: list[dict] = []
self._pending_comms: list[dict] = []
self._pending_comms_lock = threading.Lock()
self._pending_tool_calls: list[tuple[str, str]] = []
self._pending_tool_calls_lock = threading.Lock()
self._pending_history_adds: list[dict] = []
self._pending_history_adds_lock = threading.Lock()
# Blinking
self._trigger_blink = False
self._is_blinking = False
self._blink_start_time = 0.0
self._trigger_script_blink = False
self._is_script_blinking = False
self._script_blink_start_time = 0.0
self._scroll_disc_to_bottom = False
self._scroll_comms_to_bottom = False
self._scroll_tool_calls_to_bottom = False
# GUI Task Queue (thread-safe, for event handlers and hook server)
self._pending_gui_tasks: list[dict] = []
self._pending_gui_tasks_lock = threading.Lock()
# Session usage tracking
self.session_usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0}
# Token budget / cache telemetry
self._token_budget_pct = 0.0
self._token_budget_current = 0
self._token_budget_limit = 0
self._gemini_cache_text = ""
# Discussion truncation
self.ui_disc_truncate_pairs: int = 2
self.ui_auto_scroll_comms = True
self.ui_auto_scroll_tool_calls = True
# Agent tools config
agent_tools_cfg = self.project.get("agent", {}).get("tools", {})
self.ui_agent_tools: dict[str, bool] = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
# MMA Tracks
self.tracks: list[dict] = []
self.mma_streams: dict[str, str] = {}
# Prior session log viewing
self.is_viewing_prior_session = False
self.prior_session_entries: list[dict] = []
# API Hooks
self.test_hooks_enabled = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1")
# Performance monitoring
self.perf_monitor = PerformanceMonitor()
self.perf_history = {"frame_time": [0.0]*100, "fps": [0.0]*100, "cpu": [0.0]*100, "input_lag": [0.0]*100}
self._perf_last_update = 0.0
# Auto-save timer (every 60s)
self._autosave_interval = 60.0
self._last_autosave = time.time()
label = self.project.get("project", {}).get("name", "")
session_logger.open_session(label=label)
self._prune_old_logs()
self._init_ai_and_hooks()
def _prune_old_logs(self):
"""Asynchronously prunes old insignificant logs on startup."""
def run_prune():
try:
registry = LogRegistry("logs/log_registry.toml")
pruner = LogPruner(registry, "logs")
pruner.prune()
except Exception as e:
print(f"Error during log pruning: {e}")
thread = threading.Thread(target=run_prune, daemon=True)
thread.start()
@property
def current_provider(self):
return self._current_provider
@current_provider.setter
def current_provider(self, value):
if value != self._current_provider:
self._current_provider = value
ai_client.reset_session()
ai_client.set_provider(value, self.current_model)
if value == "gemini_cli":
# Ensure the adapter is initialized with the current path
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
# Start hook server if not already running (required for bridge)
if hasattr(self, 'hook_server'):
self.hook_server.start()
self.available_models = []
self._fetch_models(value)
@property
def current_model(self):
return self._current_model
@current_model.setter
def current_model(self, value):
if value != self._current_model:
self._current_model = value
ai_client.reset_session()
ai_client.set_provider(self.current_provider, value)
def _init_ai_and_hooks(self):
ai_client.set_provider(self.current_provider, self.current_model)
if self.current_provider == "gemini_cli":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
ai_client.confirm_and_run_callback = self._confirm_and_run
ai_client.comms_log_callback = self._on_comms_entry
ai_client.tool_log_callback = self._on_tool_log
mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics
self.perf_monitor.alert_callback = self._on_performance_alert
# AI client event subscriptions
ai_client.events.on("request_start", self._on_api_event)
ai_client.events.on("response_received", self._on_api_event)
ai_client.events.on("tool_execution", self._on_api_event)
# Mappings for safe hook execution
self._settable_fields = {
'ai_input': 'ui_ai_input',
'project_git_dir': 'ui_project_git_dir',
'auto_add_history': 'ui_auto_add_history',
'disc_new_name_input': 'ui_disc_new_name_input',
'project_main_context': 'ui_project_main_context',
'gcli_path': 'ui_gemini_cli_path',
'output_dir': 'ui_output_dir',
'files_base_dir': 'ui_files_base_dir',
'ai_status': 'ai_status',
'ai_response': 'ai_response',
'active_discussion': 'active_discussion',
'current_provider': 'current_provider',
'current_model': 'current_model',
'token_budget_pct': '_token_budget_pct',
'token_budget_current': '_token_budget_current',
'token_budget_label': '_token_budget_label',
'show_confirm_modal': 'show_confirm_modal',
'mma_epic_input': 'ui_epic_input',
'mma_status': 'mma_status',
'mma_active_tier': 'active_tier'
}
self._clickable_actions = {
'btn_reset': self._handle_reset_session,
'btn_gen_send': self._handle_generate_send,
'btn_md_only': self._handle_md_only,
'btn_approve_script': self._handle_approve_script,
'btn_reject_script': self._handle_reject_script,
'btn_project_save': self._cb_project_save,
'btn_disc_create': self._cb_disc_create,
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
}
self._predefined_callbacks = {
'_test_callback_func_write_to_file': self._test_callback_func_write_to_file
}
# Caching
self._discussion_names_cache = []
self._discussion_names_dirty = True
def create_api(self) -> FastAPI:
"""Creates and configures the FastAPI application for headless mode."""
api = FastAPI(title="Manual Slop Headless API")
class GenerateRequest(BaseModel):
prompt: str
auto_add_history: bool = True
temperature: float | None = None
max_tokens: int | None = None
class ConfirmRequest(BaseModel):
approved: bool
API_KEY_NAME = "X-API-KEY"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def get_api_key(header_key: str = Depends(api_key_header)):
"""Validates the API key from the request header against configuration."""
headless_cfg = self.config.get("headless", {})
config_key = headless_cfg.get("api_key", "").strip()
env_key = os.environ.get("SLOP_API_KEY", "").strip()
target_key = env_key or config_key
if not target_key:
# If no key is configured, we must deny access by default for security
raise HTTPException(status_code=403, detail="API Key not configured on server")
if header_key == target_key:
return header_key
raise HTTPException(status_code=403, detail="Could not validate API Key")
@api.get("/health")
def health():
"""Basic health check endpoint."""
return {"status": "ok"}
@api.get("/status", dependencies=[Depends(get_api_key)])
def status():
"""Returns the current status of the AI provider and active project."""
return {
"provider": self.current_provider,
"model": self.current_model,
"active_project": self.active_project_path,
"ai_status": self.ai_status,
"session_usage": self.session_usage
}
@api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)])
def pending_actions():
"""Lists all PowerShell scripts awaiting manual confirmation."""
actions = []
with self._pending_dialog_lock:
# Include multi-actions from headless mode
for uid, dialog in self._pending_actions.items():
actions.append({
"action_id": uid,
"script": dialog._script,
"base_dir": dialog._base_dir
})
# Include single active dialog from GUI mode
if self._pending_dialog:
actions.append({
"action_id": self._pending_dialog._uid,
"script": self._pending_dialog._script,
"base_dir": self._pending_dialog._base_dir
})
return actions
@api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)])
def confirm_action(action_id: str, req: ConfirmRequest):
"""Approves or denies a pending PowerShell script execution."""
success = self.resolve_pending_action(action_id, req.approved)
if not success:
raise HTTPException(status_code=404, detail=f"Action ID {action_id} not found")
return {"status": "success", "action_id": action_id, "approved": req.approved}
@api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)])
def list_sessions():
"""Lists all available session log files."""
log_dir = Path("logs")
if not log_dir.exists():
return []
return sorted([f.name for f in log_dir.glob("*.log")], reverse=True)
@api.get("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)])
def get_session(filename: str):
"""Retrieves the content of a specific session log file."""
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
log_path = Path("logs") / filename
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Session log not found")
try:
content = log_path.read_text(encoding="utf-8")
return {"filename": filename, "content": content}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@api.delete("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)])
def delete_session(filename: str):
"""Deletes a specific session log file."""
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
log_path = Path("logs") / filename
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Session log not found")
try:
log_path.unlink()
return {"status": "success", "message": f"Deleted {filename}"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@api.get("/api/v1/context", dependencies=[Depends(get_api_key)])
def get_context():
"""Returns the current file and screenshot context configuration."""
return {
"files": self.files,
"screenshots": self.screenshots,
"files_base_dir": self.ui_files_base_dir,
"screenshots_base_dir": self.ui_shots_base_dir
}
@api.post("/api/v1/generate", dependencies=[Depends(get_api_key)])
def generate(req: GenerateRequest):
"""Triggers an AI generation request using the current project context."""
if not req.prompt.strip():
raise HTTPException(status_code=400, detail="Prompt cannot be empty")
with self._send_thread_lock:
start_time = time.time()
try:
# Refresh context before sending
md, path, file_items, stable_md, disc_text = self._do_generate()
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}")
user_msg = req.prompt
base_dir = self.ui_files_base_dir
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
# Override parameters if provided in request, otherwise use GUI defaults
temp = req.temperature if req.temperature is not None else self.temperature
tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens
ai_client.set_model_params(temp, tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools)
if req.auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": user_msg,
"collapsed": False,
"ts": project_manager.now_ts()
})
try:
resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text)
if req.auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "AI",
"content": resp,
"collapsed": False,
"ts": project_manager.now_ts()
})
# Ensure metrics are updated for the response
self._recalculate_session_usage()
duration = time.time() - start_time
return {
"text": resp,
"metadata": {
"provider": self.current_provider,
"model": self.current_model,
"duration_sec": round(duration, 3),
"timestamp": project_manager.now_ts()
},
"usage": self.session_usage
}
except ProviderError as e:
# Specific error handling for vendor issues (4xx/5xx from Gemini/Anthropic)
raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}")
except Exception as e:
# Generic internal error
raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}")
@api.post("/api/v1/stream", dependencies=[Depends(get_api_key)])
async def stream(req: GenerateRequest):
"""Placeholder for streaming AI generation responses (Not yet implemented)."""
# Streaming implementation would require ai_client to support yield-based responses.
# Currently added as a placeholder to satisfy spec requirements.
raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.")
return api
# ---------------------------------------------------------------- project loading
def _cb_new_project_automated(self, user_data):
if user_data:
name = Path(user_data).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, user_data)
if user_data not in self.project_paths:
self.project_paths.append(user_data)
self._switch_project(user_data)
def _cb_project_save(self):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
def _cb_disc_create(self):
nm = self.ui_disc_new_name_input.strip()
if nm:
self._create_discussion(nm)
self.ui_disc_new_name_input = ""
def _load_active_project(self):
if self.active_project_path and Path(self.active_project_path).exists():
try:
self.project = project_manager.load_project(self.active_project_path)
return
except Exception as e:
print(f"Failed to load project {self.active_project_path}: {e}")
for pp in self.project_paths:
if Path(pp).exists():
try:
self.project = project_manager.load_project(pp)
self.active_project_path = pp
return
except Exception:
continue
self.project = project_manager.migrate_from_legacy_config(self.config)
name = self.project.get("project", {}).get("name", "project")
fallback_path = f"{name}.toml"
project_manager.save_project(self.project, fallback_path)
self.active_project_path = fallback_path
if fallback_path not in self.project_paths:
self.project_paths.append(fallback_path)
def _switch_project(self, path: str):
if not Path(path).exists():
self.ai_status = f"project file not found: {path}"
return
self._flush_to_project()
self._save_active_project()
try:
self.project = project_manager.load_project(path)
self.active_project_path = path
except Exception as e:
self.ai_status = f"failed to load project: {e}"
return
self._refresh_from_project()
self._discussion_names_dirty = True
ai_client.reset_session()
self.ai_status = f"switched to: {Path(path).stem}"
def _refresh_from_project(self):
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
proj = self.project
self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".")
self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "")
self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "")
self.ui_project_main_context = proj.get("project", {}).get("main_context", "")
self.ui_gemini_cli_path = proj.get("gemini_cli", {}).get("binary_path", "gemini")
self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False)
self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True)
self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True)
self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True)
self.ui_summary_only = proj.get("project", {}).get("summary_only", False)
agent_tools_cfg = proj.get("agent", {}).get("tools", {})
self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
# MMA Tracks
self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir)
# Restore MMA state
mma_sec = proj.get("mma", {})
self.ui_epic_input = mma_sec.get("epic", "")
at_data = mma_sec.get("active_track")
if at_data:
try:
tickets = []
for t_data in at_data.get("tickets", []):
tickets.append(Ticket(**t_data))
self.active_track = Track(
id=at_data.get("id"),
description=at_data.get("description"),
tickets=tickets
)
self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table
except Exception as e:
print(f"Failed to deserialize active track: {e}")
self.active_track = None
else:
self.active_track = None
self.active_tickets = []
# Load track-scoped history if track is active
if self.active_track:
track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir)
if track_history:
self.disc_entries = _parse_history_entries(track_history, self.disc_roles)
def _cb_load_track(self, track_id: str):
state = project_manager.load_track_state(track_id, self.ui_files_base_dir)
if state:
try:
# Convert list[Ticket] or list[dict] to list[Ticket] for Track object
tickets = []
for t in state.tasks:
if isinstance(t, dict):
tickets.append(Ticket(**t))
else:
tickets.append(t)
self.active_track = Track(
id=state.metadata.id,
description=state.metadata.name,
tickets=tickets
)
# Keep dicts for UI table (or convert Ticket objects back to dicts if needed)
from dataclasses import asdict
self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets]
# Load track-scoped history
history = project_manager.load_track_history(track_id, self.ui_files_base_dir)
if history:
self.disc_entries = _parse_history_entries(history, self.disc_roles)
else:
self.disc_entries = []
self._recalculate_session_usage()
self.ai_status = f"Loaded track: {state.metadata.name}"
except Exception as e:
self.ai_status = f"Load track error: {e}"
print(f"Error loading track {track_id}: {e}")
def _save_active_project(self):
if self.active_project_path:
try:
project_manager.save_project(self.project, self.active_project_path)
except Exception as e:
self.ai_status = f"save error: {e}"
# ---------------------------------------------------------------- discussion management
def _get_discussion_names(self) -> list[str]:
if self._discussion_names_dirty:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
self._discussion_names_cache = sorted(discussions.keys())
self._discussion_names_dirty = False
return self._discussion_names_cache
def _switch_discussion(self, name: str):
self._flush_disc_entries_to_project()
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if name not in discussions:
self.ai_status = f"discussion not found: {name}"
return
self.active_discussion = name
disc_sec["active"] = name
self._discussion_names_dirty = True
disc_data = discussions[name]
self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
self.ai_status = f"discussion: {name}"
def _flush_disc_entries_to_project(self):
history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries]
if self.active_track:
project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir)
return
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion())
disc_data["history"] = history_strings
disc_data["last_updated"] = project_manager.now_ts()
def _create_discussion(self, name: str):
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
if name in discussions:
self.ai_status = f"discussion '{name}' already exists"
return
discussions[name] = project_manager.default_discussion()
self._discussion_names_dirty = True
self._switch_discussion(name)
def _rename_discussion(self, old_name: str, new_name: str):
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if old_name not in discussions:
return
if new_name in discussions:
self.ai_status = f"discussion '{new_name}' already exists"
return
discussions[new_name] = discussions.pop(old_name)
self._discussion_names_dirty = True
if self.active_discussion == old_name:
self.active_discussion = new_name
disc_sec["active"] = new_name
def _delete_discussion(self, name: str):
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if len(discussions) <= 1:
self.ai_status = "cannot delete the last discussion"
return
if name not in discussions:
return
del discussions[name]
self._discussion_names_dirty = True
if self.active_discussion == name:
remaining = sorted(discussions.keys())
self._switch_discussion(remaining[0])
# ---------------------------------------------------------------- logic
def _on_comms_entry(self, entry: dict):
session_logger.log_comms(entry)
entry["local_ts"] = time.time()
# If this is a history_add kind, route it to history queue instead
if entry.get("kind") == "history_add":
payload = entry.get("payload", {})
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": payload.get("role", "AI"),
"content": payload.get("content", ""),
"collapsed": payload.get("collapsed", False),
"ts": entry.get("ts", project_manager.now_ts())
})
return
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _on_tool_log(self, script: str, result: str):
session_logger.log_tool_call(script, result, None)
with self._pending_tool_calls_lock:
self._pending_tool_calls.append((script, result, time.time()))
def _on_api_event(self, *args, **kwargs):
payload = kwargs.get("payload", {})
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
def _on_performance_alert(self, message: str):
"""Called by PerformanceMonitor when a threshold is exceeded."""
alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load."
# Inject into history as a 'System' message
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "System",
"content": alert_text,
"ts": project_manager.now_ts()
})
def _process_pending_gui_tasks(self):
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in tasks:
try:
action = task.get("action")
if action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}))
elif action == "handle_ai_response":
payload = task.get("payload", {})
text = payload.get("text", "")
stream_id = payload.get("stream_id")
if stream_id:
self.mma_streams[stream_id] = text
if stream_id == "Tier 1":
if "status" in payload:
self.ai_status = payload["status"]
else:
self.ai_response = text
self.ai_status = payload.get("status", "done")
self._trigger_blink = True
if self.ui_auto_add_history and not stream_id:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": self.ai_response,
"collapsed": False,
"ts": project_manager.now_ts()
})
elif action == "show_track_proposal":
self.proposed_tracks = task.get("payload", [])
self._show_track_proposal_modal = True
elif action == "mma_state_update":
payload = task.get("payload", {})
self.mma_status = payload.get("status", "idle")
self.active_tier = payload.get("active_tier")
self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage)
self.active_track = payload.get("track")
self.active_tickets = payload.get("tickets", [])
elif action == "set_value":
item = task.get("item")
value = task.get("value")
if item in self._settable_fields:
attr_name = self._settable_fields[item]
setattr(self, attr_name, value)
if item == "gcli_path":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=value)
else:
ai_client._gemini_cli_adapter.binary_path = value
elif action == "click":
item = task.get("item")
user_data = task.get("user_data")
if item == "btn_project_new_automated":
self._cb_new_project_automated(user_data)
elif item in self._clickable_actions:
# Check if it's a method that accepts user_data
import inspect
func = self._clickable_actions[item]
try:
sig = inspect.signature(func)
if 'user_data' in sig.parameters:
func(user_data=user_data)
else:
func()
except Exception:
func()
elif action == "select_list_item":
item = task.get("listbox", task.get("item"))
value = task.get("item_value", task.get("value"))
if item == "disc_listbox":
self._switch_discussion(value)
elif task.get("type") == "ask":
self._pending_ask_dialog = True
self._ask_request_id = task.get("request_id")
self._ask_tool_data = task.get("data", {})
elif action == "clear_ask":
if self._ask_request_id == task.get("request_id"):
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
elif action == "custom_callback":
cb = task.get("callback")
args = task.get("args", [])
if callable(cb):
try: cb(*args)
except Exception as e: print(f"Error in direct custom callback: {e}")
elif cb in self._predefined_callbacks:
self._predefined_callbacks[cb](*args)
elif action == "mma_step_approval":
dlg = MMAApprovalDialog(task.get("ticket_id"), task.get("payload"))
self._pending_mma_approval = task
if "dialog_container" in task:
task["dialog_container"][0] = dlg
elif action == "mma_spawn_approval":
dlg = MMASpawnApprovalDialog(
task.get("ticket_id"),
task.get("role"),
task.get("prompt"),
task.get("context_md")
)
self._pending_mma_spawn = task
self._mma_spawn_prompt = task.get("prompt", "")
self._mma_spawn_context = task.get("context_md", "")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
if "dialog_container" in task:
task["dialog_container"][0] = dlg
except Exception as e:
print(f"Error executing GUI task: {e}")
def _handle_approve_script(self):
"""Logic for approving a pending script via API hooks."""
print("[DEBUG] _handle_approve_script called")
with self._pending_dialog_lock:
if self._pending_dialog:
print(f"[DEBUG] Approving dialog for: {self._pending_dialog._script[:50]}...")
with self._pending_dialog._condition:
self._pending_dialog._approved = True
self._pending_dialog._done = True
self._pending_dialog._condition.notify_all()
self._pending_dialog = None
else:
print("[DEBUG] No pending dialog to approve")
def _handle_reject_script(self):
"""Logic for rejecting a pending script via API hooks."""
print("[DEBUG] _handle_reject_script called")
with self._pending_dialog_lock:
if self._pending_dialog:
print(f"[DEBUG] Rejecting dialog for: {self._pending_dialog._script[:50]}...")
with self._pending_dialog._condition:
self._pending_dialog._approved = False
self._pending_dialog._done = True
self._pending_dialog._condition.notify_all()
self._pending_dialog = None
else:
print("[DEBUG] No pending dialog to reject")
def _handle_mma_respond(self, approved: bool, payload: str = None, abort: bool = False, prompt: str = None, context_md: str = None):
if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
if payload is not None:
dlg._payload = payload
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_approval = None
if self._pending_mma_spawn:
dlg = self._pending_mma_spawn.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
dlg._abort = abort
if prompt is not None:
dlg._prompt = prompt
if context_md is not None:
dlg._context_md = context_md
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_spawn = None
def _handle_approve_ask(self):
"""Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
def do_post():
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": request_id, "response": {"approved": True}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
threading.Thread(target=do_post, daemon=True).start()
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reject_ask(self):
"""Responds with rejection for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
def do_post():
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": request_id, "response": {"approved": False}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
threading.Thread(target=do_post, daemon=True).start()
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reset_session(self):
"""Logic for resetting the AI session."""
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.disc_entries.clear()
# Clear history in project dict too
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if self.active_discussion in discussions:
discussions[self.active_discussion]["history"] = []
self.ai_status = "session reset"
self.ai_response = ""
self.ui_ai_input = ""
def _handle_md_only(self):
"""Logic for the 'MD Only' action."""
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
# Refresh token budget metrics with CURRENT md
self._refresh_api_metrics({}, md_content=md)
except Exception as e:
self.ai_status = f"error: {e}"
def _handle_generate_send(self):
"""Logic for the 'Gen + Send' action."""
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
self.ai_status = f"generate error: {e}"
return
self.ai_status = "sending..."
user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir
# Prepare event payload
event_payload = events.UserRequestEvent(
prompt=user_msg,
stable_md=stable_md,
file_items=file_items,
disc_text=disc_text,
base_dir=base_dir
)
# Push to async queue
asyncio.run_coroutine_threadsafe(
self.event_queue.put("user_request", event_payload),
self._loop
)
def _run_event_loop(self):
"""Runs the internal asyncio event loop."""
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._process_event_queue())
self._loop.run_forever()
def shutdown(self):
"""Cleanly shuts down the app's background tasks."""
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._loop_thread.is_alive():
self._loop_thread.join(timeout=2.0)
# Join other threads if they exist
if self.send_thread and self.send_thread.is_alive():
self.send_thread.join(timeout=1.0)
if self.models_thread and self.models_thread.is_alive():
self.models_thread.join(timeout=1.0)
async def _process_event_queue(self):
"""Listens for and processes events from the AsyncEventQueue."""
while True:
event_name, payload = await self.event_queue.get()
if event_name == "user_request":
# Handle the request (simulating what was previously in do_send thread)
self._handle_request_event(payload)
elif event_name == "response":
# Handle AI response event
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": payload
})
elif event_name == "mma_state_update":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "mma_state_update",
"payload": payload
})
def _handle_request_event(self, event: events.UserRequestEvent):
"""Processes a UserRequestEvent by calling the AI client."""
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": event.prompt,
"collapsed": False,
"ts": project_manager.now_ts()
})
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools)
try:
resp = ai_client.send(event.stable_md, event.prompt, event.base_dir, event.file_items, event.disc_text)
# Emit response event
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": resp, "status": "done"}),
self._loop
)
except ProviderError as e:
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}),
self._loop
)
except Exception as e:
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}),
self._loop
)
def _test_callback_func_write_to_file(self, data: str):
"""A dummy function that a custom_callback would execute for testing."""
# Note: This file path is relative to where the test is run.
# This is for testing purposes only.
with open("temp_callback_output.txt", "w") as f:
f.write(data)
def _recalculate_session_usage(self):
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0}
for entry in ai_client.get_comms_log():
if entry.get("kind") == "response" and "usage" in entry.get("payload", {}):
u = entry["payload"]["usage"]
for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]:
if k in usage:
usage[k] += u.get(k, 0) or 0
self.session_usage = usage
def _refresh_api_metrics(self, payload: dict, md_content: str | None = None):
if "latency" in payload:
self.session_usage["last_latency"] = payload["latency"]
self._recalculate_session_usage()
def fetch_stats():
try:
stats = ai_client.get_history_bleed_stats(md_content=md_content or self.last_md)
self._token_budget_pct = stats.get("percentage", 0.0) / 100.0
self._token_budget_current = stats.get("current", 0)
self._token_budget_limit = stats.get("limit", 0)
except Exception:
pass
threading.Thread(target=fetch_stats, daemon=True).start()
cache_stats = payload.get("cache_stats")
if cache_stats:
count = cache_stats.get("cache_count", 0)
size_bytes = cache_stats.get("total_size_bytes", 0)
self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)"
def cb_load_prior_log(self):
root = hide_tk_root()
path = filedialog.askopenfilename(
title="Load Session Log",
initialdir="logs",
filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")]
)
root.destroy()
if not path:
return
entries = []
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
except Exception as e:
self.ai_status = f"log load error: {e}"
return
self.prior_session_entries = entries
self.is_viewing_prior_session = True
self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)"
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str | None:
print(f"[DEBUG] _confirm_and_run triggered for script length: {len(script)}")
dialog = ConfirmDialog(script, base_dir)
is_headless = "--headless" in sys.argv
if is_headless:
with self._pending_dialog_lock:
self._pending_actions[dialog._uid] = dialog
print(f"[PENDING_ACTION] Created action {dialog._uid}")
else:
with self._pending_dialog_lock:
self._pending_dialog = dialog
# Notify API hook subscribers
if self.test_hooks_enabled and hasattr(self, '_api_event_queue'):
print("[DEBUG] Pushing script_confirmation_required event to queue")
with self._api_event_queue_lock:
self._api_event_queue.append({
"type": "script_confirmation_required",
"action_id": dialog._uid,
"script": str(script),
"base_dir": str(base_dir),
"ts": time.time()
})
approved, final_script = dialog.wait()
if is_headless:
with self._pending_dialog_lock:
if dialog._uid in self._pending_actions:
del self._pending_actions[dialog._uid]
print(f"[DEBUG] _confirm_and_run result: approved={approved}")
if not approved:
self._append_tool_log(final_script, "REJECTED by user")
return None
self.ai_status = "running powershell..."
print(f"[DEBUG] Running powershell in {base_dir}")
output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback)
self._append_tool_log(final_script, output)
self.ai_status = "powershell done, awaiting AI..."
return output
def resolve_pending_action(self, action_id: str, approved: bool):
"""Resolves a pending PowerShell script confirmation by its ID.
Args:
action_id: The unique identifier for the pending action.
approved: True if the script should be executed, False otherwise.
Returns:
bool: True if the action was found and resolved, False otherwise.
"""
with self._pending_dialog_lock:
if action_id in self._pending_actions:
dialog = self._pending_actions[action_id]
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
elif self._pending_dialog and self._pending_dialog._uid == action_id:
dialog = self._pending_dialog
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
return False
def _append_tool_log(self, script: str, result: str):
self._tool_log.append((script, result, time.time()))
self.ui_last_script_text = script
self.ui_last_script_output = result
self._trigger_script_blink = True
self.show_script_output = True
if self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
def _flush_to_project(self):
proj = self.project
proj.setdefault("output", {})["output_dir"] = self.ui_output_dir
proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir
proj["files"]["paths"] = self.files
proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir
proj["screenshots"]["paths"] = self.screenshots
proj.setdefault("project", {})
proj["project"]["git_dir"] = self.ui_project_git_dir
proj["project"]["system_prompt"] = self.ui_project_system_prompt
proj["project"]["main_context"] = self.ui_project_main_context
proj["project"]["word_wrap"] = self.ui_word_wrap
proj["project"]["summary_only"] = self.ui_summary_only
proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms
proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls
proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path
proj.setdefault("agent", {}).setdefault("tools", {})
for t_name in AGENT_TOOL_NAMES:
proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True)
self._flush_disc_entries_to_project()
disc_sec = proj.setdefault("discussion", {})
disc_sec["roles"] = self.disc_roles
disc_sec["active"] = self.active_discussion
disc_sec["auto_add"] = self.ui_auto_add_history
# Save MMA State
mma_sec = proj.setdefault("mma", {})
mma_sec["epic"] = self.ui_epic_input
if self.active_track:
# We only persist the basic metadata if full serialization is too complex
# For now, let's try full serialization via asdict
from dataclasses import asdict
mma_sec["active_track"] = asdict(self.active_track)
else:
mma_sec["active_track"] = None
def _flush_to_config(self):
self.config["ai"] = {
"provider": self.current_provider,
"model": self.current_model,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"history_trunc_limit": self.history_trunc_limit,
}
self.config["ai"]["system_prompt"] = self.ui_global_system_prompt
self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path}
self.config["gui"] = {"show_windows": self.show_windows}
theme.save_to_config(self.config)
def _do_generate(self) -> tuple[str, Path, list, str, str]:
"""Returns (full_md, output_path, file_items, stable_md, discussion_text)."""
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
track_id = self.active_track.id if self.active_track else None
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id)
full_md, path, file_items = aggregate.run(flat)
# Build stable markdown (no history) for Gemini caching
screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", "."))
screenshots = flat.get("screenshots", {}).get("paths", [])
summary_only = flat.get("project", {}).get("summary_only", False)
stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only)
# Build discussion history text separately
history = flat.get("discussion", {}).get("history", [])
discussion_text = aggregate.build_discussion_text(history)
return full_md, path, file_items, stable_md, discussion_text
def _fetch_models(self, provider: str):
self.ai_status = "fetching models..."
def do_fetch():
try:
models = ai_client.list_models(provider)
self.available_models = models
if self.current_model not in models and models:
self.current_model = models[0]
ai_client.set_provider(self.current_provider, self.current_model)
self.ai_status = f"models loaded: {len(models)}"
except Exception as e:
self.ai_status = f"model fetch error: {e}"
self.models_thread = threading.Thread(target=do_fetch, daemon=True)
self.models_thread.start()
# ---------------------------------------------------------------- helpers
def _render_text_viewer(self, label: str, content: str):
if imgui.button("[+]##" + str(id(content))):
self.show_text_viewer = True
self.text_viewer_title = label
self.text_viewer_content = content
def _render_heavy_text(self, label: str, content: str):
imgui.text_colored(C_LBL, f"{label}:")
imgui.same_line()
if imgui.button("[+]##" + label):
self.show_text_viewer = True
self.text_viewer_title = label
self.text_viewer_content = content
if len(content) > COMMS_CLAMP_CHARS:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content)
imgui.pop_text_wrap_pos()
else:
if imgui.begin_child(f"heavy_text_child_{label}", imgui.ImVec2(0, 80), True):
imgui.input_text_multiline(f"##{label}_input", content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end_child()
else:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content if content else "(empty)")
imgui.pop_text_wrap_pos()
else:
imgui.text(content if content else "(empty)")
# ---------------------------------------------------------------- gui
def _show_menus(self):
if imgui.begin_menu("Windows"):
for w in self.show_windows.keys():
_, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w])
imgui.end_menu()
if imgui.begin_menu("Project"):
if imgui.menu_item("Save All", "", False)[0]:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
if imgui.menu_item("Reset Session", "", False)[0]:
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.ai_status = "session reset"
self.ai_response = ""
if imgui.menu_item("Generate MD Only", "", False)[0]:
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
except Exception as e:
self.ai_status = f"error: {e}"
imgui.end_menu()
def _gui_func(self):
try:
self.perf_monitor.start_frame()
# Process GUI task queue
self._process_pending_gui_tasks()
self._render_track_proposal_modal()
# Auto-save (every 60s)
now = time.time()
if now - self._last_autosave >= self._autosave_interval:
self._last_autosave = now
try:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
except Exception:
pass # silent — don't disrupt the GUI loop
# Sync pending comms
with self._pending_comms_lock:
if self._pending_comms and self.ui_auto_scroll_comms:
self._scroll_comms_to_bottom = True
for c in self._pending_comms:
self._comms_log.append(c)
self._pending_comms.clear()
with self._pending_tool_calls_lock:
if self._pending_tool_calls and self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
for tc in self._pending_tool_calls:
self._tool_log.append(tc)
self._pending_tool_calls.clear()
# Sync pending history adds
with self._pending_history_adds_lock:
if self._pending_history_adds:
self._scroll_disc_to_bottom = True
for item in self._pending_history_adds:
if item["role"] not in self.disc_roles:
self.disc_roles.append(item["role"])
self.disc_entries.append(item)
self._pending_history_adds.clear()
# ---- Menubar
if imgui.begin_main_menu_bar():
if imgui.begin_menu("manual slop"):
if imgui.menu_item("Quit", "Ctrl+Q", False)[0]:
self.should_quit = True
imgui.end_menu()
if imgui.begin_menu("View"):
for name in self.show_windows:
_, self.show_windows[name] = imgui.menu_item(name, "", self.show_windows[name])
imgui.end_menu()
if imgui.begin_menu("Project"):
if imgui.menu_item("Save All", "Ctrl+S", False)[0]:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
if imgui.menu_item("Generate MD Only", "", False)[0]:
self._handle_md_only()
if imgui.menu_item("Reset Session", "", False)[0]:
self._handle_reset_session()
imgui.end_menu()
imgui.end_main_menu_bar()
# --- Hubs ---
if self.show_windows.get("Context Hub", False):
exp, self.show_windows["Context Hub"] = imgui.begin("Context Hub", self.show_windows["Context Hub"])
if exp:
self._render_projects_panel()
imgui.end()
if self.show_windows.get("Files & Media", False):
exp, self.show_windows["Files & Media"] = imgui.begin("Files & Media", self.show_windows["Files & Media"])
if exp:
if imgui.collapsing_header("Files"):
self._render_files_panel()
if imgui.collapsing_header("Screenshots"):
self._render_screenshots_panel()
imgui.end()
if self.show_windows.get("AI Settings", False):
exp, self.show_windows["AI Settings"] = imgui.begin("AI Settings", self.show_windows["AI Settings"])
if exp:
if imgui.collapsing_header("Provider & Model"):
self._render_provider_panel()
if imgui.collapsing_header("System Prompts"):
self._render_system_prompts_panel()
imgui.end()
if self.show_windows.get("MMA Dashboard", False):
exp, self.show_windows["MMA Dashboard"] = imgui.begin("MMA Dashboard", self.show_windows["MMA Dashboard"])
if exp:
self._render_mma_dashboard()
imgui.end()
if self.show_windows.get("Theme", False):
self._render_theme_panel()
if self.show_windows.get("Discussion Hub", False):
exp, self.show_windows["Discussion Hub"] = imgui.begin("Discussion Hub", self.show_windows["Discussion Hub"])
if exp:
# Top part for the history
if imgui.begin_child("HistoryChild", size=(0, -200)):
self._render_discussion_panel()
imgui.end_child()
# Bottom part with tabs for message and response
if imgui.begin_tab_bar("MessageResponseTabs"):
if imgui.begin_tab_item("Message")[0]:
self._render_message_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Response")[0]:
self._render_response_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
if self.show_windows.get("Operations Hub", False):
exp, self.show_windows["Operations Hub"] = imgui.begin("Operations Hub", self.show_windows["Operations Hub"])
if exp:
if imgui.begin_tab_bar("OperationsTabs"):
if imgui.begin_tab_item("Tool Calls")[0]:
self._render_tool_calls_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Comms History")[0]:
self._render_comms_history_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
if self.show_windows.get("Log Management", False):
self._render_log_management()
if self.show_windows["Diagnostics"]:
exp, self.show_windows["Diagnostics"] = imgui.begin("Diagnostics", self.show_windows["Diagnostics"])
if exp:
now = time.time()
if now - self._perf_last_update >= 0.5:
self._perf_last_update = now
metrics = self.perf_monitor.get_metrics()
self.perf_history["frame_time"].pop(0)
self.perf_history["frame_time"].append(metrics.get("last_frame_time_ms", 0.0))
self.perf_history["fps"].pop(0)
self.perf_history["fps"].append(metrics.get("fps", 0.0))
self.perf_history["cpu"].pop(0)
self.perf_history["cpu"].append(metrics.get("cpu_percent", 0.0))
self.perf_history["input_lag"].pop(0)
self.perf_history["input_lag"].append(metrics.get("input_lag_ms", 0.0))
metrics = self.perf_monitor.get_metrics()
imgui.text("Performance Telemetry")
imgui.separator()
if imgui.begin_table("perf_table", 2, imgui.TableFlags_.borders_inner_h):
imgui.table_setup_column("Metric")
imgui.table_setup_column("Value")
imgui.table_headers_row()
imgui.table_next_row()
imgui.table_next_column()
imgui.text("FPS")
imgui.table_next_column()
imgui.text(f"{metrics.get('fps', 0.0):.1f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("Frame Time (ms)")
imgui.table_next_column()
imgui.text(f"{metrics.get('last_frame_time_ms', 0.0):.2f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("CPU %")
imgui.table_next_column()
imgui.text(f"{metrics.get('cpu_percent', 0.0):.1f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("Input Lag (ms)")
imgui.table_next_column()
imgui.text(f"{metrics.get('input_lag_ms', 0.0):.1f}")
imgui.end_table()
imgui.separator()
imgui.text("Frame Time (ms)")
imgui.plot_lines("##ft_plot", np.array(self.perf_history["frame_time"], dtype=np.float32), overlay_text="frame_time", graph_size=imgui.ImVec2(-1, 60))
imgui.text("CPU %")
imgui.plot_lines("##cpu_plot", np.array(self.perf_history["cpu"], dtype=np.float32), overlay_text="cpu", graph_size=imgui.ImVec2(-1, 60))
imgui.end()
self.perf_monitor.end_frame()
# ---- Modals / Popups
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
if not self._pending_dialog_open:
imgui.open_popup("Approve PowerShell Command")
self._pending_dialog_open = True
else:
self._pending_dialog_open = False
if imgui.begin_popup_modal("Approve PowerShell Command", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not dlg:
imgui.close_current_popup()
else:
imgui.text("The AI wants to run the following PowerShell script:")
imgui.text_colored(vec4(200, 200, 100), f"base_dir: {dlg._base_dir}")
imgui.separator()
# Checkbox to toggle full preview inside modal
_, self.show_text_viewer = imgui.checkbox("Show Full Preview", self.show_text_viewer)
if self.show_text_viewer:
imgui.begin_child("preview_child", imgui.ImVec2(600, 300), True)
imgui.text_unformatted(dlg._script)
imgui.end_child()
else:
ch, dlg._script = imgui.input_text_multiline("##confirm_script", dlg._script, imgui.ImVec2(-1, 200))
imgui.separator()
if imgui.button("Approve & Run", imgui.ImVec2(120, 0)):
with dlg._condition:
dlg._approved = True
dlg._done = True
dlg._condition.notify_all()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Reject", imgui.ImVec2(120, 0)):
with dlg._condition:
dlg._approved = False
dlg._done = True
dlg._condition.notify_all()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.end_popup()
if self._pending_ask_dialog:
if not self._ask_dialog_open:
imgui.open_popup("Approve Tool Execution")
self._ask_dialog_open = True
else:
self._ask_dialog_open = False
if imgui.begin_popup_modal("Approve Tool Execution", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_ask_dialog:
imgui.close_current_popup()
else:
tool_name = self._ask_tool_data.get("tool", "unknown")
tool_args = self._ask_tool_data.get("args", {})
imgui.text("The AI wants to execute a tool:")
imgui.text_colored(vec4(200, 200, 100), f"Tool: {tool_name}")
imgui.separator()
imgui.text("Arguments:")
imgui.begin_child("ask_args_child", imgui.ImVec2(400, 200), True)
imgui.text_unformatted(json.dumps(tool_args, indent=2))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_approve_ask()
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Deny", imgui.ImVec2(120, 0)):
self._handle_reject_ask()
imgui.close_current_popup()
imgui.end_popup()
# MMA Step Approval Modal
if self._pending_mma_approval:
if not self._mma_approval_open:
imgui.open_popup("MMA Step Approval")
self._mma_approval_open = True
self._mma_approval_edit_mode = False
self._mma_approval_payload = self._pending_mma_approval.get("payload", "")
else:
self._mma_approval_open = False
if imgui.begin_popup_modal("MMA Step Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_approval:
imgui.close_current_popup()
else:
ticket_id = self._pending_mma_approval.get("ticket_id", "??")
imgui.text(f"Ticket {ticket_id} is waiting for tool execution approval.")
imgui.separator()
if self._mma_approval_edit_mode:
imgui.text("Edit Raw Payload (Manual Memory Mutation):")
_, self._mma_approval_payload = imgui.input_text_multiline("##mma_payload", self._mma_approval_payload, imgui.ImVec2(600, 400))
else:
imgui.text("Proposed Tool Call:")
imgui.begin_child("mma_preview", imgui.ImVec2(600, 300), True)
imgui.text_unformatted(str(self._pending_mma_approval.get("payload", "")))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, payload=self._mma_approval_payload)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Payload" if not self._mma_approval_edit_mode else "Show Original", imgui.ImVec2(120, 0)):
self._mma_approval_edit_mode = not self._mma_approval_edit_mode
imgui.same_line()
if imgui.button("Abort Ticket", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False)
imgui.close_current_popup()
imgui.end_popup()
# MMA Spawn Approval Modal
if self._pending_mma_spawn:
if not self._mma_spawn_open:
imgui.open_popup("MMA Spawn Approval")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
self._mma_spawn_prompt = self._pending_mma_spawn.get("prompt", "")
self._mma_spawn_context = self._pending_mma_spawn.get("context_md", "")
else:
self._mma_spawn_open = False
if imgui.begin_popup_modal("MMA Spawn Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_spawn:
imgui.close_current_popup()
else:
role = self._pending_mma_spawn.get("role", "??")
ticket_id = self._pending_mma_spawn.get("ticket_id", "??")
imgui.text(f"Spawning {role} for Ticket {ticket_id}")
imgui.separator()
if self._mma_spawn_edit_mode:
imgui.text("Edit Prompt:")
_, self._mma_spawn_prompt = imgui.input_text_multiline("##spawn_prompt", self._mma_spawn_prompt, imgui.ImVec2(800, 200))
imgui.text("Edit Context MD:")
_, self._mma_spawn_context = imgui.input_text_multiline("##spawn_context", self._mma_spawn_context, imgui.ImVec2(800, 300))
else:
imgui.text("Proposed Prompt:")
imgui.begin_child("spawn_prompt_preview", imgui.ImVec2(800, 150), True)
imgui.text_unformatted(self._mma_spawn_prompt)
imgui.end_child()
imgui.text("Proposed Context MD:")
imgui.begin_child("spawn_context_preview", imgui.ImVec2(800, 250), True)
imgui.text_unformatted(self._mma_spawn_context)
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Mode" if not self._mma_spawn_edit_mode else "Preview Mode", imgui.ImVec2(120, 0)):
self._mma_spawn_edit_mode = not self._mma_spawn_edit_mode
imgui.same_line()
if imgui.button("Abort", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False, abort=True)
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output:
if self._trigger_script_blink:
self._trigger_script_blink = False
self._is_script_blinking = True
self._script_blink_start_time = time.time()
try:
imgui.set_window_focus("Last Script Output")
except Exception:
pass
if self._is_script_blinking:
elapsed = time.time() - self._script_blink_start_time
if elapsed > 1.5:
self._is_script_blinking = False
else:
val = math.sin(elapsed * 8 * math.pi)
alpha = 60/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 100, 255, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 100, 255, alpha))
imgui.set_next_window_size(imgui.ImVec2(800, 600), imgui.Cond_.first_use_ever)
expanded, self.show_script_output = imgui.begin("Last Script Output", self.show_script_output)
if expanded:
imgui.text("Script:")
imgui.same_line()
self._render_text_viewer("Last Script", self.ui_last_script_text)
if self.ui_word_wrap:
imgui.begin_child("lso_s_wrap", imgui.ImVec2(-1, 200), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_text)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_s", self.ui_last_script_text, imgui.ImVec2(-1, 200), imgui.InputTextFlags_.read_only)
imgui.separator()
imgui.text("Output:")
imgui.same_line()
self._render_text_viewer("Last Output", self.ui_last_script_output)
if self.ui_word_wrap:
imgui.begin_child("lso_o_wrap", imgui.ImVec2(-1, -1), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_output)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_o", self.ui_last_script_output, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
if self._is_script_blinking:
imgui.pop_style_color(2)
imgui.end()
if self.show_text_viewer:
imgui.set_next_window_size(imgui.ImVec2(900, 700), imgui.Cond_.first_use_ever)
expanded, self.show_text_viewer = imgui.begin(f"Text Viewer - {self.text_viewer_title}", self.show_text_viewer)
if expanded:
if self.ui_word_wrap:
imgui.begin_child("tv_wrap", imgui.ImVec2(-1, -1), False)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.text_viewer_content)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##tv_c", self.text_viewer_content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end()
except Exception as e:
print(f"ERROR in _gui_func: {e}")
import traceback
traceback.print_exc()
def _render_projects_panel(self):
proj_name = self.project.get("project", {}).get("name", Path(self.active_project_path).stem)
imgui.text_colored(C_IN, f"Active: {proj_name}")
imgui.separator()
imgui.text("Git Directory")
ch, self.ui_project_git_dir = imgui.input_text("##git_dir", self.ui_project_git_dir)
imgui.same_line()
if imgui.button("Browse##git"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Git Directory")
r.destroy()
if d: self.ui_project_git_dir = d
imgui.separator()
imgui.text("Main Context File")
ch, self.ui_project_main_context = imgui.input_text("##main_ctx", self.ui_project_main_context)
imgui.same_line()
if imgui.button("Browse##ctx"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select Main Context File")
r.destroy()
if p: self.ui_project_main_context = p
imgui.separator()
imgui.text("Output Dir")
ch, self.ui_output_dir = imgui.input_text("##out_dir", self.ui_output_dir)
imgui.same_line()
if imgui.button("Browse##out"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Output Dir")
r.destroy()
if d: self.ui_output_dir = d
imgui.separator()
imgui.text("Project Files")
imgui.begin_child("proj_files", imgui.ImVec2(0, 150), True)
for i, pp in enumerate(self.project_paths):
is_active = (pp == self.active_project_path)
if imgui.button(f"x##p{i}"):
removed = self.project_paths.pop(i)
if removed == self.active_project_path and self.project_paths:
self._switch_project(self.project_paths[0])
break
imgui.same_line()
marker = " *" if is_active else ""
if is_active: imgui.push_style_color(imgui.Col_.text, C_IN)
if imgui.button(f"{Path(pp).stem}{marker}##ps{i}"):
self._switch_project(pp)
if is_active: imgui.pop_style_color()
imgui.same_line()
imgui.text_colored(C_LBL, pp)
imgui.end_child()
if imgui.button("Add Project"):
r = hide_tk_root()
p = filedialog.askopenfilename(
title="Select Project .toml",
filetypes=[("TOML", "*.toml"), ("All", "*.*")],
)
r.destroy()
if p and p not in self.project_paths:
self.project_paths.append(p)
imgui.same_line()
if imgui.button("New Project"):
r = hide_tk_root()
p = filedialog.asksaveasfilename(title="Create New Project .toml", defaultextension=".toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")])
r.destroy()
if p:
name = Path(p).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, p)
if p not in self.project_paths:
self.project_paths.append(p)
self._switch_project(p)
imgui.same_line()
if imgui.button("Save All"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
ch, self.ui_word_wrap = imgui.checkbox("Word-Wrap (Read-only panels)", self.ui_word_wrap)
ch, self.ui_summary_only = imgui.checkbox("Summary Only (send file structure, not full content)", self.ui_summary_only)
ch, self.ui_auto_scroll_comms = imgui.checkbox("Auto-scroll Comms History", self.ui_auto_scroll_comms)
ch, self.ui_auto_scroll_tool_calls = imgui.checkbox("Auto-scroll Tool History", self.ui_auto_scroll_tool_calls)
if imgui.collapsing_header("Agent Tools"):
for t_name in AGENT_TOOL_NAMES:
val = self.ui_agent_tools.get(t_name, True)
ch, val = imgui.checkbox(f"Enable {t_name}", val)
if ch:
self.ui_agent_tools[t_name] = val
imgui.separator()
imgui.text_colored(C_LBL, 'MMA Orchestration')
_, self.ui_epic_input = imgui.input_text_multiline('##epic_input', self.ui_epic_input, imgui.ImVec2(-1, 80))
if imgui.button('Plan Epic (Tier 1)', imgui.ImVec2(-1, 0)):
self._cb_plan_epic()
def _cb_plan_epic(self):
def _bg_task():
try:
self.ai_status = "Planning Epic (Tier 1)..."
history = orchestrator_pm.get_track_history_summary()
proj = project_manager.load_project(self.active_project_path)
flat = project_manager.flat_config(proj)
file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", []))
tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history)
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": {
"text": json.dumps(tracks, indent=2),
"stream_id": "Tier 1",
"status": "Epic tracks generated."
}
})
self._pending_gui_tasks.append({
"action": "show_track_proposal",
"payload": tracks
})
except Exception as e:
self.ai_status = f"Epic plan error: {e}"
print(f"ERROR in _cb_plan_epic background task: {e}")
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_accept_tracks(self):
def _bg_task():
for track_data in self.proposed_tracks:
self._start_track_logic(track_data)
self.ai_status = "Tracks accepted and execution started."
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_start_track(self, user_data=None):
idx = 0
if isinstance(user_data, int):
idx = user_data
elif isinstance(user_data, dict):
idx = user_data.get("index", 0)
if 0 <= idx < len(self.proposed_tracks):
track_data = self.proposed_tracks[idx]
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start()
self.ai_status = f"Track '{title}' started."
def _start_track_logic(self, track_data):
try:
goal = track_data.get("goal", "")
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
self.ai_status = f"Phase 2: Generating tickets for {title}..."
# 1. Get skeletons for context
parser = ASTParser(language="python")
skeletons = ""
for i, file_path in enumerate(self.files):
try:
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
abs_path = Path(self.ui_files_base_dir) / file_path
if abs_path.exists() and abs_path.suffix == ".py":
with open(abs_path, "r", encoding="utf-8") as f:
code = f.read()
skeletons += f"\nFile: {file_path}\n{parser.get_skeleton(code)}\n"
except Exception as e:
print(f"Error parsing skeleton for {file_path}: {e}")
self.ai_status = "Phase 2: Calling Tech Lead..."
raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons)
if not raw_tickets:
self.ai_status = f"Error: No tickets generated for track: {title}"
print(f"Warning: No tickets generated for track: {title}")
return
self.ai_status = "Phase 2: Sorting tickets..."
try:
sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets)
except ValueError as e:
print(f"Dependency error in track '{title}': {e}")
sorted_tickets_data = raw_tickets
# 3. Create Track and Ticket objects
tickets = []
for t_data in sorted_tickets_data:
ticket = Ticket(
id=t_data["id"],
description=t_data.get("description") or t_data.get("goal", "No description"),
status=t_data.get("status", "todo"),
assigned_to=t_data.get("assigned_to", "unassigned"),
depends_on=t_data.get("depends_on", []),
step_mode=t_data.get("step_mode", False)
)
tickets.append(ticket)
track_id = f"track_{uuid.uuid4().hex[:8]}"
track = Track(id=track_id, description=title, tickets=tickets)
# Initialize track state in the filesystem
from models import TrackState, Metadata
from datetime import datetime
now = datetime.now()
meta = Metadata(id=track_id, name=title, status="todo", created_at=now, updated_at=now)
state = TrackState(metadata=meta, discussion=[], tasks=tickets)
project_manager.save_track_state(track_id, state, self.ui_files_base_dir)
# 4. Initialize ConductorEngine and run loop
engine = multi_agent_conductor.ConductorEngine(track, self.event_queue)
# Use current full markdown context for the track execution
track_id_param = track.id
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param)
full_md, _, _ = aggregate.run(flat)
# Schedule the coroutine on the internal event loop
asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop)
except Exception as e:
self.ai_status = f"Track start error: {e}"
print(f"ERROR in _start_track_logic: {e}")
def _render_track_proposal_modal(self):
if self._show_track_proposal_modal:
imgui.open_popup("Track Proposal")
if imgui.begin_popup_modal("Track Proposal", True, imgui.WindowFlags_.always_auto_resize)[0]:
imgui.text_colored(C_IN, "Proposed Implementation Tracks")
imgui.separator()
if not self.proposed_tracks:
imgui.text("No tracks generated.")
else:
for idx, track in enumerate(self.proposed_tracks):
imgui.text_colored(C_LBL, f"Track {idx+1}: {track.get('title', 'Untitled')}")
imgui.text_wrapped(f"Goal: {track.get('goal', 'N/A')}")
if imgui.button(f"Start This Track##{idx}"):
self._cb_start_track(idx)
imgui.separator()
if imgui.button("Accept", imgui.ImVec2(120, 0)):
self._cb_accept_tracks()
self._show_track_proposal_modal = False
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Cancel", imgui.ImVec2(120, 0)):
self._show_track_proposal_modal = False
imgui.close_current_popup()
imgui.end_popup()
def _render_log_management(self):
exp, self.show_windows["Log Management"] = imgui.begin("Log Management", self.show_windows["Log Management"])
if not exp:
imgui.end()
return
registry = LogRegistry("logs/log_registry.toml")
sessions = registry.data
if imgui.begin_table("sessions_table", 7, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable):
imgui.table_setup_column("Session ID")
imgui.table_setup_column("Start Time")
imgui.table_setup_column("Star")
imgui.table_setup_column("Reason")
imgui.table_setup_column("Size (KB)")
imgui.table_setup_column("Msgs")
imgui.table_setup_column("Actions")
imgui.table_headers_row()
for session_id, s_data in sessions.items():
imgui.table_next_row()
imgui.table_next_column()
imgui.text(session_id)
imgui.table_next_column()
imgui.text(s_data.get("start_time", ""))
imgui.table_next_column()
whitelisted = s_data.get("whitelisted", False)
if whitelisted:
imgui.text_colored(vec4(255, 215, 0), "YES")
else:
imgui.text("NO")
metadata = s_data.get("metadata") or {}
imgui.table_next_column()
imgui.text(metadata.get("reason", ""))
imgui.table_next_column()
imgui.text(str(metadata.get("size_kb", "")))
imgui.table_next_column()
imgui.text(str(metadata.get("message_count", "")))
imgui.table_next_column()
if whitelisted:
if imgui.button(f"Unstar##{session_id}"):
registry.update_session_metadata(
session_id,
message_count=metadata.get("message_count"),
errors=metadata.get("errors"),
size_kb=metadata.get("size_kb"),
whitelisted=False,
reason=metadata.get("reason")
)
else:
if imgui.button(f"Star##{session_id}"):
registry.update_session_metadata(
session_id,
message_count=metadata.get("message_count"),
errors=metadata.get("errors"),
size_kb=metadata.get("size_kb"),
whitelisted=True,
reason="Manually whitelisted"
)
imgui.end_table()
imgui.end()
def _render_files_panel(self):
imgui.text("Base Dir")
ch, self.ui_files_base_dir = imgui.input_text("##f_base", self.ui_files_base_dir)
imgui.same_line()
if imgui.button("Browse##fb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_files_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("f_paths", imgui.ImVec2(0, -40), True)
for i, f in enumerate(self.files):
if imgui.button(f"x##f{i}"):
self.files.pop(i)
break
imgui.same_line()
imgui.text(f)
imgui.end_child()
if imgui.button("Add File(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames()
r.destroy()
for p in paths:
if p not in self.files: self.files.append(p)
imgui.same_line()
if imgui.button("Add Wildcard"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.files.append(str(Path(d) / "**" / "*"))
def _render_screenshots_panel(self):
imgui.text("Base Dir")
ch, self.ui_shots_base_dir = imgui.input_text("##s_base", self.ui_shots_base_dir)
imgui.same_line()
if imgui.button("Browse##sb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_shots_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("s_paths", imgui.ImVec2(0, -40), True)
for i, s in enumerate(self.screenshots):
if imgui.button(f"x##s{i}"):
self.screenshots.pop(i)
break
imgui.same_line()
imgui.text(s)
imgui.end_child()
if imgui.button("Add Screenshot(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames(
title="Select Screenshots",
filetypes=[("Images", "*.png *.jpg *.jpeg *.gif *.bmp *.webp"), ("All", "*.*")],
)
r.destroy()
for p in paths:
if p not in self.screenshots: self.screenshots.append(p)
def _render_discussion_panel(self):
# THINKING indicator
is_thinking = self.ai_status in ["sending..."]
if is_thinking:
val = math.sin(time.time() * 10 * math.pi)
alpha = 1.0 if val > 0 else 0.0
imgui.text_colored(imgui.ImVec4(1.0, 0.39, 0.39, alpha), "THINKING...")
imgui.separator()
# Prior session viewing mode
if self.is_viewing_prior_session:
imgui.push_style_color(imgui.Col_.child_bg, vec4(50, 40, 20))
imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION")
imgui.same_line()
if imgui.button("Exit Prior Session"):
self.is_viewing_prior_session = False
self.prior_session_entries.clear()
imgui.separator()
imgui.begin_child("prior_scroll", imgui.ImVec2(0, 0), False)
for idx, entry in enumerate(self.prior_session_entries):
imgui.push_id(f"prior_{idx}")
kind = entry.get("kind", entry.get("type", ""))
imgui.text_colored(C_LBL, f"#{idx+1}")
imgui.same_line()
ts = entry.get("ts", entry.get("timestamp", ""))
if ts:
imgui.text_colored(vec4(160, 160, 160), str(ts))
imgui.same_line()
imgui.text_colored(C_KEY, str(kind))
payload = entry.get("payload", entry)
text = payload.get("text", payload.get("message", payload.get("content", "")))
if text:
preview = str(text).replace("\\n", " ")[:200]
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(preview)
imgui.pop_text_wrap_pos()
else:
imgui.text(preview)
imgui.separator()
imgui.pop_id()
imgui.end_child()
imgui.pop_style_color()
return
if not self.is_viewing_prior_session and imgui.collapsing_header("Discussions", imgui.TreeNodeFlags_.default_open):
names = self._get_discussion_names()
if imgui.begin_combo("##disc_sel", self.active_discussion):
for name in names:
is_selected = (name == self.active_discussion)
if imgui.selectable(name, is_selected)[0]:
self._switch_discussion(name)
if is_selected:
imgui.set_item_default_focus()
imgui.end_combo()
disc_sec = self.project.get("discussion", {})
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
git_commit = disc_data.get("git_commit", "")
last_updated = disc_data.get("last_updated", "")
imgui.text_colored(C_LBL, "commit:")
imgui.same_line()
imgui.text_colored(C_IN if git_commit else C_LBL, git_commit[:12] if git_commit else "(none)")
imgui.same_line()
if imgui.button("Update Commit"):
git_dir = self.ui_project_git_dir
if git_dir:
cmt = project_manager.get_git_commit(git_dir)
if cmt:
disc_data["git_commit"] = cmt
disc_data["last_updated"] = project_manager.now_ts()
self.ai_status = f"commit: {cmt[:12]}"
imgui.text_colored(C_LBL, "updated:")
imgui.same_line()
imgui.text_colored(C_SUB, last_updated if last_updated else "(never)")
ch, self.ui_disc_new_name_input = imgui.input_text("##new_disc", self.ui_disc_new_name_input)
imgui.same_line()
if imgui.button("Create"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._create_discussion(nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Rename"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._rename_discussion(self.active_discussion, nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Delete"):
self._delete_discussion(self.active_discussion)
if not self.is_viewing_prior_session:
imgui.separator()
if imgui.button("+ Entry"):
self.disc_entries.append({"role": self.disc_roles[0] if self.disc_roles else "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
if imgui.button("-All"):
for e in self.disc_entries: e["collapsed"] = True
imgui.same_line()
if imgui.button("+All"):
for e in self.disc_entries: e["collapsed"] = False
imgui.same_line()
if imgui.button("Clear All"):
self.disc_entries.clear()
imgui.same_line()
if imgui.button("Save"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "discussion saved"
ch, self.ui_auto_add_history = imgui.checkbox("Auto-add message & response to history", self.ui_auto_add_history)
# Truncation controls
imgui.text("Keep Pairs:")
imgui.same_line()
imgui.set_next_item_width(80)
ch, self.ui_disc_truncate_pairs = imgui.input_int("##trunc_pairs", self.ui_disc_truncate_pairs, 1)
if self.ui_disc_truncate_pairs < 1: self.ui_disc_truncate_pairs = 1
imgui.same_line()
if imgui.button("Truncate"):
self.disc_entries = truncate_entries(self.disc_entries, self.ui_disc_truncate_pairs)
self.ai_status = f"history truncated to {self.ui_disc_truncate_pairs} pairs"
imgui.separator()
if imgui.collapsing_header("Roles"):
imgui.begin_child("roles_scroll", imgui.ImVec2(0, 100), True)
for i, r in enumerate(self.disc_roles):
if imgui.button(f"x##r{i}"):
self.disc_roles.pop(i)
break
imgui.same_line()
imgui.text(r)
imgui.end_child()
ch, self.ui_disc_new_role_input = imgui.input_text("##new_role", self.ui_disc_new_role_input)
imgui.same_line()
if imgui.button("Add"):
r = self.ui_disc_new_role_input.strip()
if r and r not in self.disc_roles:
self.disc_roles.append(r)
self.ui_disc_new_role_input = ""
imgui.separator()
imgui.begin_child("disc_scroll", imgui.ImVec2(0, 0), False)
clipper = imgui.ListClipper()
clipper.begin(len(self.disc_entries))
while clipper.step():
for i in range(clipper.display_start, clipper.display_end):
entry = self.disc_entries[i]
imgui.push_id(str(i))
collapsed = entry.get("collapsed", False)
read_mode = entry.get("read_mode", False)
if imgui.button("+" if collapsed else "-"):
entry["collapsed"] = not collapsed
imgui.same_line()
imgui.set_next_item_width(120)
if imgui.begin_combo("##role", entry["role"]):
for r in self.disc_roles:
if imgui.selectable(r, r == entry["role"])[0]:
entry["role"] = r
imgui.end_combo()
if not collapsed:
imgui.same_line()
if imgui.button("[Edit]" if read_mode else "[Read]"):
entry["read_mode"] = not read_mode
ts_str = entry.get("ts", "")
if ts_str:
imgui.same_line()
imgui.text_colored(vec4(120, 120, 100), str(ts_str))
if collapsed:
imgui.same_line()
if imgui.button("Ins"):
self.disc_entries.insert(i, {"role": "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
self._render_text_viewer(f"Entry #{i+1}", entry["content"])
imgui.same_line()
if imgui.button("Del"):
self.disc_entries.pop(i)
imgui.pop_id()
break # Break from inner loop, clipper will re-step
imgui.same_line()
preview = entry["content"].replace("\\n", " ")[:60]
if len(entry["content"]) > 60: preview += "..."
imgui.text_colored(vec4(160, 160, 150), preview)
if not collapsed:
if read_mode:
imgui.begin_child("read_content", imgui.ImVec2(0, 150), True)
if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(entry["content"])
if self.ui_word_wrap: imgui.pop_text_wrap_pos()
imgui.end_child()
else:
ch, entry["content"] = imgui.input_text_multiline("##content", entry["content"], imgui.ImVec2(-1, 150))
imgui.separator()
imgui.pop_id()
if self._scroll_disc_to_bottom:
imgui.set_scroll_here_y(1.0)
self._scroll_disc_to_bottom = False
imgui.end_child()
def _render_provider_panel(self):
imgui.text("Provider")
if imgui.begin_combo("##prov", self.current_provider):
for p in PROVIDERS:
if imgui.selectable(p, p == self.current_provider)[0]:
self.current_provider = p
imgui.end_combo()
imgui.separator()
imgui.text("Model")
imgui.same_line()
if imgui.button("Fetch Models"):
self._fetch_models(self.current_provider)
if imgui.begin_list_box("##models", imgui.ImVec2(-1, 120)):
for m in self.available_models:
if imgui.selectable(m, m == self.current_model)[0]:
self.current_model = m
imgui.end_list_box()
imgui.separator()
imgui.text("Parameters")
ch, self.temperature = imgui.slider_float("Temperature", self.temperature, 0.0, 2.0, "%.2f")
ch, self.max_tokens = imgui.input_int("Max Tokens (Output)", self.max_tokens, 1024)
ch, self.history_trunc_limit = imgui.input_int("History Truncation Limit", self.history_trunc_limit, 1024)
if self.current_provider == "gemini_cli":
imgui.separator()
imgui.text("Gemini CLI")
sid = "None"
if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter:
sid = ai_client._gemini_cli_adapter.session_id or "None"
imgui.text(f"Session ID: {sid}")
if imgui.button("Reset CLI Session"):
ai_client.reset_session()
imgui.text("Binary Path")
ch, self.ui_gemini_cli_path = imgui.input_text("##gcli_path", self.ui_gemini_cli_path)
imgui.same_line()
if imgui.button("Browse##gcli"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select gemini CLI binary")
r.destroy()
if p:
self.ui_gemini_cli_path = p
if ch:
if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
imgui.separator()
imgui.text("Telemetry")
usage = self.session_usage
total = usage["input_tokens"] + usage["output_tokens"]
if total == 0 and usage.get("total_tokens", 0) > 0:
total = usage["total_tokens"]
imgui.text_colored(C_RES, f"Tokens: {total:,} (In: {usage['input_tokens']:,} Out: {usage['output_tokens']:,})")
if usage.get("last_latency", 0.0) > 0:
imgui.text_colored(C_LBL, f" Last Latency: {usage['last_latency']:.2f}s")
if usage["cache_read_input_tokens"]:
imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}")
imgui.text("Token Budget:")
imgui.progress_bar(self._token_budget_pct, imgui.ImVec2(-1, 0), f"{self._token_budget_current:,} / {self._token_budget_limit:,}")
if self._gemini_cache_text:
imgui.text_colored(C_SUB, self._gemini_cache_text)
def _render_message_panel(self):
# LIVE indicator
is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
if is_live:
val = math.sin(time.time() * 10 * math.pi)
alpha = 1.0 if val > 0 else 0.0
imgui.text_colored(imgui.ImVec4(0.39, 1.0, 0.39, alpha), "LIVE")
imgui.separator()
ch, self.ui_ai_input = imgui.input_text_multiline("##ai_in", self.ui_ai_input, imgui.ImVec2(-1, -40))
# Keyboard shortcuts
io = imgui.get_io()
ctrl_enter = io.key_ctrl and imgui.is_key_pressed(imgui.Key.enter)
ctrl_l = io.key_ctrl and imgui.is_key_pressed(imgui.Key.l)
if ctrl_l:
self.ui_ai_input = ""
imgui.separator()
send_busy = False
with self._send_thread_lock:
if self.send_thread and self.send_thread.is_alive():
send_busy = True
if (imgui.button("Gen + Send") or ctrl_enter) and not send_busy:
self._handle_generate_send()
imgui.same_line()
if imgui.button("MD Only"):
self._handle_md_only()
imgui.same_line()
if imgui.button("Reset"):
self._handle_reset_session()
imgui.same_line()
if imgui.button("-> History"):
if self.ui_ai_input:
self.disc_entries.append({"role": "User", "content": self.ui_ai_input, "collapsed": False, "ts": project_manager.now_ts()})
def _render_response_panel(self):
if self._trigger_blink:
self._trigger_blink = False
self._is_blinking = True
self._blink_start_time = time.time()
try:
imgui.set_window_focus("Response")
except:
pass
is_blinking = False
if self._is_blinking:
elapsed = time.time() - self._blink_start_time
if elapsed > 1.5:
self._is_blinking = False
else:
is_blinking = True
val = math.sin(elapsed * 8 * math.pi)
alpha = 50/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 255, 0, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, alpha))
# --- Always Render Content ---
if self.ui_word_wrap:
imgui.begin_child("resp_wrap", imgui.ImVec2(-1, -40), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ai_response)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##ai_out", self.ai_response, imgui.ImVec2(-1, -40), imgui.InputTextFlags_.read_only)
imgui.separator()
if imgui.button("-> History"):
if self.ai_response:
self.disc_entries.append({"role": "AI", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
if is_blinking:
imgui.pop_style_color(2)
def _cb_ticket_retry(self, ticket_id):
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'todo'
break
asyncio.run_coroutine_threadsafe(
self.event_queue.put("mma_retry", {"ticket_id": ticket_id}),
self._loop
)
def _cb_ticket_skip(self, ticket_id):
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'skipped'
break
asyncio.run_coroutine_threadsafe(
self.event_queue.put("mma_skip", {"ticket_id": ticket_id}),
self._loop
)
def _render_mma_dashboard(self):
# 1. Track Browser
imgui.text("Track Browser")
if imgui.begin_table("mma_tracks_table", 4, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable):
imgui.table_setup_column("Title")
imgui.table_setup_column("Status")
imgui.table_setup_column("Progress")
imgui.table_setup_column("Actions")
imgui.table_headers_row()
for track in self.tracks:
imgui.table_next_row()
imgui.table_next_column()
imgui.text(track.get("title", "Untitled"))
imgui.table_next_column()
imgui.text(track.get("status", "unknown"))
imgui.table_next_column()
progress = track.get("progress", 0.0)
imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{int(progress*100)}%")
imgui.table_next_column()
if imgui.button(f"Load##{track.get('id')}"):
self._cb_load_track(track.get("id"))
imgui.end_table()
imgui.separator()
# 2. Global Controls
changed, self.mma_step_mode = imgui.checkbox("Step Mode (HITL)", self.mma_step_mode)
if changed:
# We could push an event here if the engine needs to know immediately
pass
imgui.same_line()
imgui.text(f"Status: {self.mma_status.upper()}")
if self.active_tier:
imgui.same_line()
imgui.text_colored(C_VAL, f"| Active: {self.active_tier}")
imgui.separator()
# 2. Active Track Info
if self.active_track:
imgui.text(f"Track: {self.active_track.description}")
# Progress bar
tickets = self.active_tickets
total = len(tickets)
if total > 0:
complete = sum(1 for t in tickets if t.get('status') == 'complete')
progress = complete / total
imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{complete}/{total} Tickets")
else:
imgui.text_disabled("No active MMA track.")
# 3. Token Usage Table
imgui.separator()
imgui.text("Tier Usage (Tokens)")
if imgui.begin_table("mma_usage", 3, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg):
imgui.table_setup_column("Tier")
imgui.table_setup_column("Input")
imgui.table_setup_column("Output")
imgui.table_headers_row()
usage = self.mma_tier_usage
for tier, stats in usage.items():
imgui.table_next_row()
imgui.table_next_column()
imgui.text(tier)
imgui.table_next_column()
imgui.text(f"{stats.get('input', 0):,}")
imgui.table_next_column()
imgui.text(f"{stats.get('output', 0):,}")
imgui.end_table()
imgui.separator()
imgui.separator()
imgui.text("Strategy (Tier 1)")
strategy_text = self.mma_streams.get("Tier 1", "")
imgui.input_text_multiline("##mma_strategy", strategy_text, imgui.ImVec2(-1, 150), imgui.InputTextFlags_.read_only)
# 4. Task DAG Visualizer
imgui.text("Task DAG")
if self.active_track:
tickets_by_id = {t.get('id'): t for t in self.active_tickets}
all_ids = set(tickets_by_id.keys())
# Build children map
children_map = {}
for t in self.active_tickets:
for dep in t.get('depends_on', []):
if dep not in children_map: children_map[dep] = []
children_map[dep].append(t.get('id'))
# Roots are those whose depends_on elements are NOT in all_ids
roots = []
for t in self.active_tickets:
deps = t.get('depends_on', [])
has_local_dep = any(d in all_ids for d in deps)
if not has_local_dep:
roots.append(t)
rendered = set()
for root in roots:
self._render_ticket_dag_node(root, tickets_by_id, children_map, rendered)
else:
imgui.text_disabled("No active MMA track.")
def _render_ticket_dag_node(self, ticket, tickets_by_id, children_map, rendered):
tid = ticket.get('id', '??')
target = ticket.get('target_file', 'general')
status = ticket.get('status', 'pending').upper()
# Determine color
status_color = vec4(200, 200, 200) # Gray (TODO)
if status == 'RUNNING':
status_color = vec4(255, 255, 0) # Yellow
elif status == 'COMPLETE':
status_color = vec4(0, 255, 0) # Green
elif status in ['BLOCKED', 'ERROR']:
status_color = vec4(255, 0, 0) # Red
elif status == 'PAUSED':
status_color = vec4(255, 165, 0) # Orange
flags = imgui.TreeNodeFlags_.open_on_arrow | imgui.TreeNodeFlags_.open_on_double_click | imgui.TreeNodeFlags_.default_open
children = children_map.get(tid, [])
if not children:
flags |= imgui.TreeNodeFlags_.leaf
# Check if already rendered elsewhere to avoid infinite recursion or duplicate subtrees
is_duplicate = tid in rendered
node_open = imgui.tree_node_ex(f"##{tid}", flags)
# Detail View / Tooltip
if imgui.is_item_hovered():
imgui.begin_tooltip()
imgui.text_colored(C_KEY, f"ID: {tid}")
imgui.text_colored(C_LBL, f"Target: {target}")
imgui.text_colored(C_LBL, f"Description:")
imgui.same_line()
imgui.text_wrapped(ticket.get('description', 'N/A'))
deps = ticket.get('depends_on', [])
if deps:
imgui.text_colored(C_LBL, f"Depends on: {', '.join(deps)}")
stream_key = f"Tier 3: {tid}"
if stream_key in self.mma_streams:
imgui.separator()
imgui.text_colored(C_KEY, "Worker Stream:")
imgui.text_wrapped(self.mma_streams[stream_key])
imgui.end_tooltip()
imgui.same_line()
imgui.text_colored(C_KEY, tid)
imgui.same_line(150)
imgui.text_disabled(str(target))
imgui.same_line(400)
imgui.text_colored(status_color, status)
imgui.same_line(500)
if imgui.button(f"Retry##{tid}"):
self._cb_ticket_retry(tid)
imgui.same_line()
if imgui.button(f"Skip##{tid}"):
self._cb_ticket_skip(tid)
if node_open:
if not is_duplicate:
rendered.add(tid)
for child_id in children:
child = tickets_by_id.get(child_id)
if child:
self._render_ticket_dag_node(child, tickets_by_id, children_map, rendered)
else:
imgui.text_disabled(" (shown above)")
imgui.tree_pop()
def _render_tool_calls_panel(self):
imgui.text("Tool call history")
imgui.same_line()
if imgui.button("Clear##tc"):
self._tool_log.clear()
imgui.separator()
imgui.begin_child("tc_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar)
log_copy = list(self._tool_log)
for idx_minus_one, entry in enumerate(log_copy):
idx = idx_minus_one + 1
# Handle both old (tuple) and new (tuple with ts) entries
if len(entry) == 3:
script, result, local_ts = entry
else:
script, result = entry
local_ts = 0
# Blink effect
blink_alpha = 0.0
if local_ts > 0:
elapsed = time.time() - local_ts
if elapsed < 3.0:
blink_alpha = (1.0 - (elapsed / 3.0)) * 0.3 * (math.sin(elapsed * 10) * 0.5 + 0.5)
imgui.push_id(f"tc_entry_{idx}")
if blink_alpha > 0:
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, blink_alpha))
imgui.begin_group()
first_line = script.strip().splitlines()[0][:80] if script.strip() else "(empty)"
imgui.text_colored(C_KEY, f"Call #{idx}: {first_line}")
# Script Display
imgui.text_colored(C_LBL, "Script:")
imgui.same_line()
if imgui.button(f"[+]##script_{idx}"):
self.show_text_viewer = True
self.text_viewer_title = f"Call Script #{idx}"
self.text_viewer_content = script
if self.ui_word_wrap:
imgui.begin_child(f"tc_script_wrap_{idx}", imgui.ImVec2(-1, 72), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(script)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.begin_child(f"tc_script_fixed_width_{idx}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar)
imgui.input_text_multiline(f"##tc_script_res_{idx}", script, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end_child()
# Result Display
imgui.text_colored(C_LBL, "Output:")
imgui.same_line()
if imgui.button(f"[+]##output_{idx}"):
self.show_text_viewer = True
self.text_viewer_title = f"Call Output #{idx}"
self.text_viewer_content = result
if self.ui_word_wrap:
imgui.begin_child(f"tc_res_wrap_{idx}", imgui.ImVec2(-1, 72), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(result)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.begin_child(f"tc_res_fixed_width_{idx}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar)
imgui.input_text_multiline(f"##tc_res_val_{idx}", result, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end_child()
imgui.separator()
if blink_alpha > 0:
imgui.end_group()
imgui.pop_style_color()
imgui.pop_id()
if self._scroll_tool_calls_to_bottom:
imgui.set_scroll_here_y(1.0)
self._scroll_tool_calls_to_bottom = False
imgui.end_child()
def _render_comms_history_panel(self):
imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}")
imgui.same_line()
if imgui.button("Clear##comms"):
ai_client.clear_comms_log()
self._comms_log.clear()
imgui.same_line()
if imgui.button("Load Log"):
self.cb_load_prior_log()
if self.is_viewing_prior_session:
imgui.same_line()
if imgui.button("Exit Prior Session"):
self.is_viewing_prior_session = False
self.prior_session_entries.clear()
self.ai_status = "idle"
imgui.separator()
imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION")
imgui.separator()
imgui.text_colored(C_OUT, "OUT")
imgui.same_line()
imgui.text_colored(C_REQ, "request")
imgui.same_line()
imgui.text_colored(C_TC, "tool_call")
imgui.same_line()
imgui.text(" ")
imgui.same_line()
imgui.text_colored(C_IN, "IN")
imgui.same_line()
imgui.text_colored(C_RES, "response")
imgui.same_line()
imgui.text_colored(C_TR, "tool_result")
imgui.separator()
# Use tinted background for prior session
if self.is_viewing_prior_session:
imgui.push_style_color(imgui.Col_.child_bg, vec4(40, 30, 20))
imgui.begin_child("comms_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar)
log_to_render = self.prior_session_entries if self.is_viewing_prior_session else list(self._comms_log)
for idx_minus_one, entry in enumerate(log_to_render):
idx = idx_minus_one + 1
local_ts = entry.get("local_ts", 0)
# Blink effect
blink_alpha = 0.0
if local_ts > 0 and not self.is_viewing_prior_session:
elapsed = time.time() - local_ts
if elapsed < 3.0:
blink_alpha = (1.0 - (elapsed / 3.0)) * 0.3 * (math.sin(elapsed * 10) * 0.5 + 0.5)
imgui.push_id(f"comms_{idx}")
if blink_alpha > 0:
# Draw a background highlight for the entry
draw_list = imgui.get_window_draw_list()
p_min = imgui.get_cursor_screen_pos()
# Estimate height or just use a fixed height for the background
# It's better to wrap the entry in a group or just use separators
# For now, let's just use the style color push if we are sure we pop it
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, blink_alpha))
# We still need a child or a group to apply the background to
imgui.begin_group()
d = entry.get("direction", "IN")
k = entry.get("kind", "response")
imgui.text_colored(vec4(160, 160, 160), f"#{idx}")
imgui.same_line()
imgui.text_colored(vec4(160, 160, 160), entry.get("ts", "00:00:00"))
imgui.same_line()
imgui.text_colored(DIR_COLORS.get(d, C_VAL), d)
imgui.same_line()
imgui.text_colored(KIND_COLORS.get(k, C_VAL), k)
imgui.same_line()
imgui.text_colored(C_LBL, f"{entry.get('provider', '?')}/{entry.get('model', '?')}")
payload = entry.get("payload", {})
if k == "request":
self._render_heavy_text("message", payload.get("message", ""))
elif k == "response":
imgui.text_colored(C_LBL, "round:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload.get("round", "")))
imgui.text_colored(C_LBL, "stop_reason:")
imgui.same_line()
imgui.text_colored(vec4(255, 200, 120), str(payload.get("stop_reason", "")))
text = payload.get("text", "")
if text: self._render_heavy_text("text", text)
imgui.text_colored(C_LBL, "tool_calls:")
tcs = payload.get("tool_calls", [])
if not tcs: imgui.text_colored(C_VAL, " (none)")
for tc_i, tc in enumerate(tcs):
imgui.text_colored(C_KEY, f" call[{tc_i}] {tc.get('name', '?')}")
if tc.get("id"):
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(tc["id"]))
if "args" in tc or "input" in tc:
self._render_heavy_text(f"call_{tc_i}_args", str(tc.get("args") or tc.get("input")))
elif k == "tool_call":
imgui.text_colored(C_KEY, payload.get("name", "?"))
if payload.get("id"):
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
if "script" in payload: self._render_heavy_text("script", payload["script"])
if "args" in payload: self._render_heavy_text("args", str(payload["args"]))
elif k == "tool_result":
imgui.text_colored(C_KEY, payload.get("name", "?"))
if payload.get("id"):
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
if "output" in payload: self._render_heavy_text("output", payload["output"])
if "results" in payload:
for r_i, r in enumerate(payload["results"]):
imgui.text_colored(C_LBL, f" Result[{r_i}]:")
self._render_heavy_text(f"res_{r_i}", str(r))
if "usage" in payload:
u = payload["usage"]
u_str = f"In: {u.get('input_tokens', 0)} Out: {u.get('output_tokens', 0)}"
if u.get("cache_read_input_tokens"): u_str += f" (Cache: {u['cache_read_input_tokens']})"
imgui.text_colored(C_SUB, f" Usage: {u_str}")
imgui.separator()
if blink_alpha > 0:
imgui.end_group()
imgui.pop_style_color()
imgui.pop_id()
if self._scroll_comms_to_bottom:
imgui.set_scroll_here_y(1.0)
self._scroll_comms_to_bottom = False
imgui.end_child()
if self.is_viewing_prior_session:
imgui.pop_style_color()
def _render_system_prompts_panel(self):
imgui.text("Global System Prompt (all projects)")
ch, self.ui_global_system_prompt = imgui.input_text_multiline("##gsp", self.ui_global_system_prompt, imgui.ImVec2(-1, 100))
imgui.separator()
imgui.text("Project System Prompt")
ch, self.ui_project_system_prompt = imgui.input_text_multiline("##psp", self.ui_project_system_prompt, imgui.ImVec2(-1, 100))
def _render_theme_panel(self):
exp, self.show_windows["Theme"] = imgui.begin("Theme", self.show_windows["Theme"])
if exp:
imgui.text("Palette")
cp = theme.get_current_palette()
if imgui.begin_combo("##pal", cp):
for p in theme.get_palette_names():
if imgui.selectable(p, p == cp)[0]:
theme.apply(p)
imgui.end_combo()
imgui.separator()
imgui.text("Font")
imgui.push_item_width(-150)
ch, path = imgui.input_text("##fontp", theme.get_current_font_path())
imgui.pop_item_width()
if ch: theme._current_font_path = path
imgui.same_line()
if imgui.button("Browse##font"):
r = hide_tk_root()
p = filedialog.askopenfilename(filetypes=[("Fonts", "*.ttf *.otf"), ("All", "*.*")])
r.destroy()
if p: theme._current_font_path = p
imgui.text("Size (px)")
imgui.same_line()
imgui.push_item_width(100)
ch, size = imgui.input_float("##fonts", theme.get_current_font_size(), 1.0, 1.0, "%.0f")
if ch: theme._current_font_size = size
imgui.pop_item_width()
imgui.same_line()
if imgui.button("Apply Font (Requires Restart)"):
self._flush_to_config()
save_config(self.config)
self.ai_status = "Font settings saved. Restart required."
imgui.separator()
imgui.text("UI Scale (DPI)")
ch, scale = imgui.slider_float("##scale", theme.get_current_scale(), 0.5, 3.0, "%.2f")
if ch: theme.set_scale(scale)
imgui.end()
def _load_fonts(self):
font_path, font_size = theme.get_font_loading_params()
if font_path and Path(font_path).exists():
hello_imgui.load_font(font_path, font_size)
def _post_init(self):
theme.apply_current()
def run(self):
"""Initializes the ImGui runner and starts the main application loop."""
if "--headless" in sys.argv:
print("Headless mode active")
self._fetch_models(self.current_provider)
import uvicorn
headless_cfg = self.config.get("headless", {})
port = headless_cfg.get("port", 8000)
api = self.create_api()
uvicorn.run(api, host="0.0.0.0", port=port)
else:
theme.load_from_config(self.config)
self.runner_params = hello_imgui.RunnerParams()
self.runner_params.app_window_params.window_title = "manual slop"
self.runner_params.app_window_params.window_geometry.size = (1680, 1200)
self.runner_params.imgui_window_params.enable_viewports = False
self.runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space
self.runner_params.fps_idling.enable_idling = False
self.runner_params.imgui_window_params.show_menu_bar = True
self.runner_params.ini_folder_type = hello_imgui.IniFolderType.current_folder
self.runner_params.ini_filename = "manualslop_layout.ini"
self.runner_params.callbacks.show_gui = self._gui_func
self.runner_params.callbacks.show_menus = self._show_menus
self.runner_params.callbacks.load_additional_fonts = self._load_fonts
self.runner_params.callbacks.post_init = self._post_init
self._fetch_models(self.current_provider)
# Start API hooks server (if enabled)
self.hook_server = api_hooks.HookServer(self)
self.hook_server.start()
immapp.run(self.runner_params)
# On exit
self.hook_server.stop()
self.perf_monitor.stop()
ai_client.cleanup() # Destroy active API caches to stop billing
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
session_logger.close_session()
def main():
app = App()
app.run()
if __name__ == "__main__":
main()