Files
manual_slop/temp_gui_old2.py

2163 lines
170 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# gui_2.py
from __future__ import annotations
import tomli_w
import threading
import asyncio
import time
import math
import json
import sys
import os
import uuid
import requests # type: ignore[import-untyped]
from pathlib import Path
from tkinter import filedialog, Tk
from typing import Optional, Callable, Any
import aggregate
import ai_client
import cost_tracker
from ai_client import ProviderError
import shell_runner
import session_logger
import project_manager
import theme_2 as theme
import tomllib
import events
import numpy as np
import api_hooks
import mcp_client
import orchestrator_pm
from performance_monitor import PerformanceMonitor
from log_registry import LogRegistry
from log_pruner import LogPruner
import conductor_tech_lead
import multi_agent_conductor
from models import Track, Ticket, DISC_ROLES, AGENT_TOOL_NAMES, CONFIG_PATH, load_config, parse_history_entries
from app_controller import AppController
from file_cache import ASTParser
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel
from imgui_bundle import imgui, hello_imgui, immapp
PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek"]
COMMS_CLAMP_CHARS: int = 300
def save_config(config: dict[str, Any]) -> None:
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
def hide_tk_root() -> Tk:
root = Tk()
root.withdraw()
root.wm_attributes("-topmost", True)
return root
# Color Helpers
def vec4(r: float, g: float, b: float, a: float = 1.0) -> imgui.ImVec4: return imgui.ImVec4(r/255, g/255, b/255, a)
C_OUT: imgui.ImVec4 = vec4(100, 200, 255)
C_IN: imgui.ImVec4 = vec4(140, 255, 160)
C_REQ: imgui.ImVec4 = vec4(255, 220, 100)
C_RES: imgui.ImVec4 = vec4(180, 255, 180)
C_TC: imgui.ImVec4 = vec4(255, 180, 80)
C_TR: imgui.ImVec4 = vec4(180, 220, 255)
C_TRS: imgui.ImVec4 = vec4(200, 180, 255)
C_LBL: imgui.ImVec4 = vec4(180, 180, 180)
C_VAL: imgui.ImVec4 = vec4(220, 220, 220)
C_KEY: imgui.ImVec4 = vec4(140, 200, 255)
C_NUM: imgui.ImVec4 = vec4(180, 255, 180)
C_SUB: imgui.ImVec4 = vec4(220, 200, 120)
DIR_COLORS: dict[str, imgui.ImVec4] = {"OUT": C_OUT, "IN": C_IN}
KIND_COLORS: dict[str, imgui.ImVec4] = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS}
HEAVY_KEYS: set[str] = {"message", "text", "script", "output", "content"}
def truncate_entries(entries: list[dict[str, Any]], max_pairs: int) -> list[dict[str, Any]]:
if max_pairs <= 0:
return []
count = 0
target = max_pairs * 2
for i in range(len(entries) - 1, -1, -1):
role = entries[i].get("role", "")
if role in ("User", "AI"):
count += 1
if count == target:
return entries[i:]
return entries
class GenerateRequest(BaseModel):
prompt: str
auto_add_history: bool = True
temperature: float | None = None
max_tokens: int | None = None
class ConfirmRequest(BaseModel):
approved: bool
script: Optional[str] = None
class App:
"""The main ImGui interface orchestrator for Manual Slop."""
def __init__(self) -> None:
# Initialize controller and delegate state
self.controller = AppController()
self.controller.init_state()
self.controller.start_services()
# Aliases for controller-owned locks
self._send_thread_lock = self.controller._send_thread_lock
self._disc_entries_lock = self.controller._disc_entries_lock
self._pending_comms_lock = self.controller._pending_comms_lock
self._pending_tool_calls_lock = self.controller._pending_tool_calls_lock
self._pending_history_adds_lock = self.controller._pending_history_adds_lock
self._pending_gui_tasks_lock = self.controller._pending_gui_tasks_lock
self._pending_dialog_lock = self.controller._pending_dialog_lock
self._api_event_queue_lock = self.controller._api_event_queue_lock
# UI-specific initialization
self._init_ui_actions()
def _init_ui_actions(self) -> None:
# Set up UI-specific action maps
self._clickable_actions: dict[str, Callable[..., Any]] = {
'btn_reset': self._handle_reset_session,
'btn_gen_send': self._handle_generate_send,
'btn_md_only': self._handle_md_only,
'btn_approve_script': self._handle_approve_script,
'btn_reject_script': self._handle_reject_script,
'btn_project_save': self._cb_project_save,
'btn_disc_create': self._cb_disc_create,
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type),
'btn_approve_tool': self._handle_approve_tool,
'btn_approve_mma_step': self._handle_approve_mma_step,
'btn_approve_spawn': self._handle_approve_spawn,
}
self._predefined_callbacks: dict[str, Callable[..., Any]] = {
'_test_callback_func_write_to_file': self._test_callback_func_write_to_file
}
self._discussion_names_cache: list[str] = []
self._discussion_names_dirty: bool = True
def __getattr__(self, name: str) -> Any:
if name != 'controller' and hasattr(self, 'controller') and hasattr(self.controller, name):
return getattr(self.controller, name)
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
def __setattr__(self, name: str, value: Any) -> None:
if name == 'controller':
super().__setattr__(name, value)
elif hasattr(self, 'controller') and hasattr(self.controller, name):
setattr(self.controller, name, value)
else:
super().__setattr__(name, value)
@property
def current_provider(self) -> str:
return self.controller.current_provider
@current_provider.setter
def current_provider(self, value: str) -> None:
self.controller.current_provider = value
@property
def current_model(self) -> str:
return self.controller.current_model
@current_model.setter
def current_model(self, value: str) -> None:
self.controller.current_model = value
# ---------------------------------------------------------------- project loading
# ---------------------------------------------------------------- logic
def _process_pending_gui_tasks(self) -> None:
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in tasks:
try:
action = task.get("action")
if action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}), md_content=self.last_md or None)
elif action == "handle_ai_response":
payload = task.get("payload", {})
text = payload.get("text", "")
stream_id = payload.get("stream_id")
is_streaming = payload.get("status") == "streaming..."
if stream_id:
if is_streaming:
if stream_id not in self.mma_streams: self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
else:
self.mma_streams[stream_id] = text
if stream_id == "Tier 1":
if "status" in payload:
self.ai_status = payload["status"]
else:
if is_streaming:
self.ai_response += text
else:
self.ai_response = text
self.ai_status = payload.get("status", "done")
self._trigger_blink = True
if not stream_id:
self._token_stats_dirty = True
if self.ui_auto_add_history and not stream_id:
role = payload.get("role", "AI")
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": self.ai_response,
"collapsed": False,
"ts": project_manager.now_ts()
})
elif action == "mma_stream_append":
payload = task.get("payload", {})
stream_id = payload.get("stream_id")
text = payload.get("text", "")
if stream_id:
if stream_id not in self.mma_streams:
self.mma_streams[stream_id] = ""
self.mma_streams[stream_id] += text
elif action == "show_track_proposal":
self.proposed_tracks = task.get("payload", [])
self._show_track_proposal_modal = True
elif action == "mma_state_update":
payload = task.get("payload", {})
self.mma_status = payload.get("status", "idle")
self.active_tier = payload.get("active_tier")
self.mma_tier_usage = payload.get("tier_usage", self.mma_tier_usage)
self.active_tickets = payload.get("tickets", [])
track_data = payload.get("track")
if track_data:
tickets = []
for t_data in self.active_tickets:
tickets.append(Ticket(**t_data))
self.active_track = Track(
id=track_data.get("id"),
description=track_data.get("title", ""),
tickets=tickets
)
elif action == "set_value":
item = task.get("item")
value = task.get("value")
if item in self._settable_fields:
attr_name = self._settable_fields[item]
setattr(self, attr_name, value)
if item == "gcli_path":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=str(value))
else:
ai_client._gemini_cli_adapter.binary_path = str(value)
elif action == "click":
item = task.get("item")
user_data = task.get("user_data")
if item == "btn_project_new_automated":
self._cb_new_project_automated(user_data)
elif item == "btn_mma_load_track":
self._cb_load_track(str(user_data or ""))
elif item in self._clickable_actions:
# Check if it's a method that accepts user_data
import inspect
func = self._clickable_actions[item]
try:
sig = inspect.signature(func)
if 'user_data' in sig.parameters:
func(user_data=user_data)
else:
func()
except Exception:
func()
elif action == "select_list_item":
item = task.get("listbox", task.get("item"))
value = task.get("item_value", task.get("value"))
if item == "disc_listbox":
self._switch_discussion(str(value or ""))
elif task.get("type") == "ask":
self._pending_ask_dialog = True
self._ask_request_id = task.get("request_id")
self._ask_tool_data = task.get("data", {})
elif action == "clear_ask":
if self._ask_request_id == task.get("request_id"):
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
elif action == "custom_callback":
cb = task.get("callback")
args = task.get("args", [])
if callable(cb):
try: cb(*args)
except Exception as e: print(f"Error in direct custom callback: {e}")
elif cb in self._predefined_callbacks:
self._predefined_callbacks[cb](*args)
elif action == "mma_step_approval":
dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or ""))
self._pending_mma_approval = task
if "dialog_container" in task:
task["dialog_container"][0] = dlg
elif action == 'refresh_from_project':
self._refresh_from_project()
elif action == "mma_spawn_approval":
spawn_dlg = MMASpawnApprovalDialog(
str(task.get("ticket_id") or ""),
str(task.get("role") or ""),
str(task.get("prompt") or ""),
str(task.get("context_md") or "")
)
self._pending_mma_spawn = task
self._mma_spawn_prompt = task.get("prompt", "")
self._mma_spawn_context = task.get("context_md", "")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
if "dialog_container" in task:
task["dialog_container"][0] = spawn_dlg
except Exception as e:
print(f"Error executing GUI task: {e}")
def _process_pending_history_adds(self) -> None:
"""Synchronizes pending history entries to the active discussion and project state."""
with self._pending_history_adds_lock:
items = self._pending_history_adds[:]
self._pending_history_adds.clear()
if not items:
return
self._scroll_disc_to_bottom = True
for item in items:
item.get("role", "unknown")
if item.get("role") and item["role"] not in self.disc_roles:
self.disc_roles.append(item["role"])
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
disc_data = discussions.get(self.active_discussion)
if disc_data is not None:
if item.get("disc_title", self.active_discussion) == self.active_discussion:
if self.disc_entries is not disc_data.get("history"):
if "history" not in disc_data:
disc_data["history"] = []
disc_data["history"].append(project_manager.entry_to_str(item))
disc_data["last_updated"] = project_manager.now_ts()
with self._disc_entries_lock:
self.disc_entries.append(item)
def shutdown(self) -> None:
"""Cleanly shuts down the app's background tasks and saves state."""
self.controller.stop_services()
# Join other threads if they exist
if self.send_thread and self.send_thread.is_alive():
self.send_thread.join(timeout=1.0)
if self.models_thread and self.models_thread.is_alive():
self.models_thread.join(timeout=1.0)
# Final State persistence
try:
ai_client.cleanup() # Destroy active API caches to stop billing
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
except: pass
def _test_callback_func_write_to_file(self, data: str) -> None:
"""A dummy function that a custom_callback would execute for testing."""
import os
# Ensure the directory exists if running from a different cwd
os.makedirs("tests/artifacts", exist_ok=True)
with open("tests/artifacts/temp_callback_output.txt", "w") as f:
f.write(data)
# ---------------------------------------------------------------- helpers
def _render_text_viewer(self, label: str, content: str) -> None:
if imgui.button("[+]##" + str(id(content))):
self.show_text_viewer = True
self.text_viewer_title = label
self.text_viewer_content = content
def _render_heavy_text(self, label: str, content: str) -> None:
imgui.text_colored(C_LBL, f"{label}:")
imgui.same_line()
if imgui.button("[+]##" + label):
self.show_text_viewer = True
self.text_viewer_title = label
self.text_viewer_content = content
if len(content) > COMMS_CLAMP_CHARS:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content)
imgui.pop_text_wrap_pos()
else:
imgui.begin_child(f"heavy_text_child_{label}", imgui.ImVec2(0, 80), True)
imgui.input_text_multiline(f"##{label}_input", content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end_child()
else:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content if content else "(empty)")
imgui.pop_text_wrap_pos()
else:
imgui.text(content if content else "(empty)")
# ---------------------------------------------------------------- gui
def _show_menus(self) -> None:
if imgui.begin_menu("manual slop"):
if imgui.menu_item("Quit", "Ctrl+Q", False)[0]:
self.runner_params.app_shall_exit = True
imgui.end_menu()
if imgui.begin_menu("Windows"):
for w in self.show_windows.keys():
_, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w])
imgui.end_menu()
if imgui.begin_menu("Project"):
if imgui.menu_item("Save All", "", False)[0]:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
if imgui.menu_item("Reset Session", "", False)[0]:
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.ai_status = "session reset"
self.ai_response = ""
if imgui.menu_item("Generate MD Only", "", False)[0]:
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
except Exception as e:
self.ai_status = f"error: {e}"
imgui.end_menu()
def _gui_func(self) -> None:
try:
self.perf_monitor.start_frame()
# Process GUI task queue
self._process_pending_gui_tasks()
self._render_track_proposal_modal()
# Auto-save (every 60s)
now = time.time()
if now - self._last_autosave >= self._autosave_interval:
self._last_autosave = now
try:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
except Exception:
pass # silent ΓÇö don't disrupt the GUI loop
# Sync pending comms
with self._pending_comms_lock:
if self._pending_comms and self.ui_auto_scroll_comms:
self._scroll_comms_to_bottom = True
for c in self._pending_comms:
self._comms_log.append(c)
self._pending_comms.clear()
with self._pending_tool_calls_lock:
if self._pending_tool_calls and self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
for tc in self._pending_tool_calls:
self._tool_log.append(tc)
self._pending_tool_calls.clear()
if self.show_windows.get("Context Hub", False):
exp, opened = imgui.begin("Context Hub", self.show_windows["Context Hub"])
self.show_windows["Context Hub"] = bool(opened)
if exp:
self._render_projects_panel()
imgui.end()
if self.show_windows.get("Files & Media", False):
exp, opened = imgui.begin("Files & Media", self.show_windows["Files & Media"])
self.show_windows["Files & Media"] = bool(opened)
if exp:
if imgui.collapsing_header("Files"):
self._render_files_panel()
if imgui.collapsing_header("Screenshots"):
self._render_screenshots_panel()
imgui.end()
if self.show_windows.get("AI Settings", False):
exp, opened = imgui.begin("AI Settings", self.show_windows["AI Settings"])
self.show_windows["AI Settings"] = bool(opened)
if exp:
if imgui.collapsing_header("Provider & Model"):
self._render_provider_panel()
if imgui.collapsing_header("System Prompts"):
self._render_system_prompts_panel()
if imgui.collapsing_header("Token Budget"):
self._render_token_budget_panel()
imgui.end()
if self.show_windows.get("MMA Dashboard", False):
exp, opened = imgui.begin("MMA Dashboard", self.show_windows["MMA Dashboard"])
self.show_windows["MMA Dashboard"] = bool(opened)
if exp:
self._render_mma_dashboard()
imgui.end()
if self.show_windows.get("Tier 1: Strategy", False):
exp, opened = imgui.begin("Tier 1: Strategy", self.show_windows["Tier 1: Strategy"])
self.show_windows["Tier 1: Strategy"] = bool(opened)
if exp:
self._render_tier_stream_panel("Tier 1", "Tier 1")
imgui.end()
if self.show_windows.get("Tier 2: Tech Lead", False):
exp, opened = imgui.begin("Tier 2: Tech Lead", self.show_windows["Tier 2: Tech Lead"])
self.show_windows["Tier 2: Tech Lead"] = bool(opened)
if exp:
self._render_tier_stream_panel("Tier 2", "Tier 2 (Tech Lead)")
imgui.end()
if self.show_windows.get("Tier 3: Workers", False):
exp, opened = imgui.begin("Tier 3: Workers", self.show_windows["Tier 3: Workers"])
self.show_windows["Tier 3: Workers"] = bool(opened)
if exp:
self._render_tier_stream_panel("Tier 3", None)
imgui.end()
if self.show_windows.get("Tier 4: QA", False):
exp, opened = imgui.begin("Tier 4: QA", self.show_windows["Tier 4: QA"])
self.show_windows["Tier 4: QA"] = bool(opened)
if exp:
self._render_tier_stream_panel("Tier 4", "Tier 4 (QA)")
imgui.end()
if self.show_windows.get("Theme", False):
self._render_theme_panel()
if self.show_windows.get("Discussion Hub", False):
exp, opened = imgui.begin("Discussion Hub", self.show_windows["Discussion Hub"])
self.show_windows["Discussion Hub"] = bool(opened)
if exp:
# Top part for the history
imgui.begin_child("HistoryChild", size=(0, -200))
self._render_discussion_panel()
imgui.end_child()
# Bottom part with tabs for message and response
if imgui.begin_tab_bar("MessageResponseTabs"):
if imgui.begin_tab_item("Message")[0]:
self._render_message_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Response")[0]:
self._render_response_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
if self.show_windows.get("Operations Hub", False):
exp, opened = imgui.begin("Operations Hub", self.show_windows["Operations Hub"])
self.show_windows["Operations Hub"] = bool(opened)
if exp:
imgui.text("Focus Agent:")
imgui.same_line()
focus_label = self.ui_focus_agent or "All"
if imgui.begin_combo("##focus_agent", focus_label, imgui.ComboFlags_.width_fit_preview):
if imgui.selectable("All", self.ui_focus_agent is None)[0]:
self.ui_focus_agent = None
for tier in ["Tier 2", "Tier 3", "Tier 4"]:
if imgui.selectable(tier, self.ui_focus_agent == tier)[0]:
self.ui_focus_agent = tier
imgui.end_combo()
imgui.same_line()
if self.ui_focus_agent:
if imgui.button("x##clear_focus"):
self.ui_focus_agent = None
imgui.separator()
if imgui.begin_tab_bar("OperationsTabs"):
if imgui.begin_tab_item("Tool Calls")[0]:
self._render_tool_calls_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Comms History")[0]:
self._render_comms_history_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
if self.show_windows.get("Log Management", False):
self._render_log_management()
if self.show_windows["Diagnostics"]:
exp, opened = imgui.begin("Diagnostics", self.show_windows["Diagnostics"])
self.show_windows["Diagnostics"] = bool(opened)
if exp:
now = time.time()
if now - self._perf_last_update >= 0.5:
self._perf_last_update = now
metrics = self.perf_monitor.get_metrics()
self.perf_history["frame_time"].pop(0)
self.perf_history["frame_time"].append(metrics.get("last_frame_time_ms", 0.0))
self.perf_history["fps"].pop(0)
self.perf_history["fps"].append(metrics.get("fps", 0.0))
self.perf_history["cpu"].pop(0)
self.perf_history["cpu"].append(metrics.get("cpu_percent", 0.0))
self.perf_history["input_lag"].pop(0)
self.perf_history["input_lag"].append(metrics.get("input_lag_ms", 0.0))
metrics = self.perf_monitor.get_metrics()
imgui.text("Performance Telemetry")
imgui.separator()
if imgui.begin_table("perf_table", 2, imgui.TableFlags_.borders_inner_h):
imgui.table_setup_column("Metric")
imgui.table_setup_column("Value")
imgui.table_headers_row()
imgui.table_next_row()
imgui.table_next_column()
imgui.text("FPS")
imgui.table_next_column()
imgui.text(f"{metrics.get('fps', 0.0):.1f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("Frame Time (ms)")
imgui.table_next_column()
imgui.text(f"{metrics.get('last_frame_time_ms', 0.0):.2f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("CPU %")
imgui.table_next_column()
imgui.text(f"{metrics.get('cpu_percent', 0.0):.1f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("Input Lag (ms)")
imgui.table_next_column()
imgui.text(f"{metrics.get('input_lag_ms', 0.0):.1f}")
imgui.end_table()
imgui.separator()
imgui.text("Frame Time (ms)")
imgui.plot_lines("##ft_plot", np.array(self.perf_history["frame_time"], dtype=np.float32), overlay_text="frame_time", graph_size=imgui.ImVec2(-1, 60))
imgui.text("CPU %")
imgui.plot_lines("##cpu_plot", np.array(self.perf_history["cpu"], dtype=np.float32), overlay_text="cpu", graph_size=imgui.ImVec2(-1, 60))
imgui.end()
self.perf_monitor.end_frame()
# ---- Modals / Popups
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
if not self._pending_dialog_open:
imgui.open_popup("Approve PowerShell Command")
self._pending_dialog_open = True
else:
self._pending_dialog_open = False
if imgui.begin_popup_modal("Approve PowerShell Command", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not dlg:
imgui.close_current_popup()
else:
imgui.text("The AI wants to run the following PowerShell script:")
imgui.text_colored(vec4(200, 200, 100), f"base_dir: {dlg._base_dir}")
imgui.separator()
# Checkbox to toggle full preview inside modal
_, self.show_text_viewer = imgui.checkbox("Show Full Preview", self.show_text_viewer)
if self.show_text_viewer:
imgui.begin_child("preview_child", imgui.ImVec2(600, 300), True)
imgui.text_unformatted(dlg._script)
imgui.end_child()
else:
ch, dlg._script = imgui.input_text_multiline("##confirm_script", dlg._script, imgui.ImVec2(-1, 200))
imgui.separator()
if imgui.button("Approve & Run", imgui.ImVec2(120, 0)):
with dlg._condition:
dlg._approved = True
dlg._done = True
dlg._condition.notify_all()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Reject", imgui.ImVec2(120, 0)):
with dlg._condition:
dlg._approved = False
dlg._done = True
dlg._condition.notify_all()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.end_popup()
if self._pending_ask_dialog:
if not self._ask_dialog_open:
imgui.open_popup("Approve Tool Execution")
self._ask_dialog_open = True
else:
self._ask_dialog_open = False
if imgui.begin_popup_modal("Approve Tool Execution", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_ask_dialog or self._ask_tool_data is None:
imgui.close_current_popup()
else:
tool_name = self._ask_tool_data.get("tool", "unknown")
tool_args = self._ask_tool_data.get("args", {})
imgui.text("The AI wants to execute a tool:")
imgui.text_colored(vec4(200, 200, 100), f"Tool: {tool_name}")
imgui.separator()
imgui.text("Arguments:")
imgui.begin_child("ask_args_child", imgui.ImVec2(400, 200), True)
imgui.text_unformatted(json.dumps(tool_args, indent=2))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_approve_ask()
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Deny", imgui.ImVec2(120, 0)):
self._handle_reject_ask()
imgui.close_current_popup()
imgui.end_popup()
# MMA Step Approval Modal
if self._pending_mma_approval:
if not self._mma_approval_open:
imgui.open_popup("MMA Step Approval")
self._mma_approval_open = True
self._mma_approval_edit_mode = False
self._mma_approval_payload = self._pending_mma_approval.get("payload", "")
else:
self._mma_approval_open = False
if imgui.begin_popup_modal("MMA Step Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_approval:
imgui.close_current_popup()
else:
ticket_id = self._pending_mma_approval.get("ticket_id", "??")
imgui.text(f"Ticket {ticket_id} is waiting for tool execution approval.")
imgui.separator()
if self._mma_approval_edit_mode:
imgui.text("Edit Raw Payload (Manual Memory Mutation):")
_, self._mma_approval_payload = imgui.input_text_multiline("##mma_payload", self._mma_approval_payload, imgui.ImVec2(600, 400))
else:
imgui.text("Proposed Tool Call:")
imgui.begin_child("mma_preview", imgui.ImVec2(600, 300), True)
imgui.text_unformatted(str(self._pending_mma_approval.get("payload", "")))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, payload=self._mma_approval_payload)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Payload" if not self._mma_approval_edit_mode else "Show Original", imgui.ImVec2(120, 0)):
self._mma_approval_edit_mode = not self._mma_approval_edit_mode
imgui.same_line()
if imgui.button("Abort Ticket", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False)
imgui.close_current_popup()
imgui.end_popup()
# MMA Spawn Approval Modal
if self._pending_mma_spawn:
if not self._mma_spawn_open:
imgui.open_popup("MMA Spawn Approval")
self._mma_spawn_open = True
self._mma_spawn_edit_mode = False
self._mma_spawn_prompt = self._pending_mma_spawn.get("prompt", "")
self._mma_spawn_context = self._pending_mma_spawn.get("context_md", "")
else:
self._mma_spawn_open = False
if imgui.begin_popup_modal("MMA Spawn Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_spawn:
imgui.close_current_popup()
else:
role = self._pending_mma_spawn.get("role", "??")
ticket_id = self._pending_mma_spawn.get("ticket_id", "??")
imgui.text(f"Spawning {role} for Ticket {ticket_id}")
imgui.separator()
if self._mma_spawn_edit_mode:
imgui.text("Edit Prompt:")
_, self._mma_spawn_prompt = imgui.input_text_multiline("##spawn_prompt", self._mma_spawn_prompt, imgui.ImVec2(800, 200))
imgui.text("Edit Context MD:")
_, self._mma_spawn_context = imgui.input_text_multiline("##spawn_context", self._mma_spawn_context, imgui.ImVec2(800, 300))
else:
imgui.text("Proposed Prompt:")
imgui.begin_child("spawn_prompt_preview", imgui.ImVec2(800, 150), True)
imgui.text_unformatted(self._mma_spawn_prompt)
imgui.end_child()
imgui.text("Proposed Context MD:")
imgui.begin_child("spawn_context_preview", imgui.ImVec2(800, 250), True)
imgui.text_unformatted(self._mma_spawn_context)
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Mode" if not self._mma_spawn_edit_mode else "Preview Mode", imgui.ImVec2(120, 0)):
self._mma_spawn_edit_mode = not self._mma_spawn_edit_mode
imgui.same_line()
if imgui.button("Abort", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False, abort=True)
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output:
if self._trigger_script_blink:
self._trigger_script_blink = False
self._is_script_blinking = True
self._script_blink_start_time = time.time()
try:
imgui.set_window_focus("Last Script Output") # type: ignore[call-arg]
except Exception:
pass
if self._is_script_blinking:
elapsed = time.time() - self._script_blink_start_time
if elapsed > 1.5:
self._is_script_blinking = False
else:
val = math.sin(elapsed * 8 * math.pi)
alpha = 60/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 100, 255, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 100, 255, alpha))
imgui.set_next_window_size(imgui.ImVec2(800, 600), imgui.Cond_.first_use_ever)
expanded, opened = imgui.begin("Last Script Output", self.show_script_output)
self.show_script_output = bool(opened)
if expanded:
imgui.text("Script:")
imgui.same_line()
self._render_text_viewer("Last Script", self.ui_last_script_text)
if self.ui_word_wrap:
imgui.begin_child("lso_s_wrap", imgui.ImVec2(-1, 200), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_text)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_s", self.ui_last_script_text, imgui.ImVec2(-1, 200), imgui.InputTextFlags_.read_only)
imgui.separator()
imgui.text("Output:")
imgui.same_line()
self._render_text_viewer("Last Output", self.ui_last_script_output)
if self.ui_word_wrap:
imgui.begin_child("lso_o_wrap", imgui.ImVec2(-1, -1), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_output)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_o", self.ui_last_script_output, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
if self._is_script_blinking:
imgui.pop_style_color(2)
imgui.end()
if self.show_text_viewer:
imgui.set_next_window_size(imgui.ImVec2(900, 700), imgui.Cond_.first_use_ever)
expanded, opened = imgui.begin(f"Text Viewer - {self.text_viewer_title}", self.show_text_viewer)
self.show_text_viewer = bool(opened)
if expanded:
if self.ui_word_wrap:
imgui.begin_child("tv_wrap", imgui.ImVec2(-1, -1), False)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.text_viewer_content)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##tv_c", self.text_viewer_content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end()
except Exception as e:
print(f"ERROR in _gui_func: {e}")
import traceback
traceback.print_exc()
def _render_projects_panel(self) -> None:
proj_name = self.project.get("project", {}).get("name", Path(self.active_project_path).stem)
imgui.text_colored(C_IN, f"Active: {proj_name}")
imgui.separator()
imgui.text("Git Directory")
ch, self.ui_project_git_dir = imgui.input_text("##git_dir", self.ui_project_git_dir)
imgui.same_line()
if imgui.button("Browse##git"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Git Directory")
r.destroy()
if d: self.ui_project_git_dir = d
imgui.separator()
imgui.text("Main Context File")
ch, self.ui_project_main_context = imgui.input_text("##main_ctx", self.ui_project_main_context)
imgui.same_line()
if imgui.button("Browse##ctx"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select Main Context File")
r.destroy()
if p: self.ui_project_main_context = p
imgui.separator()
imgui.text("Output Dir")
ch, self.ui_output_dir = imgui.input_text("##out_dir", self.ui_output_dir)
imgui.same_line()
if imgui.button("Browse##out"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Output Dir")
r.destroy()
if d: self.ui_output_dir = d
imgui.separator()
imgui.text("Project Files")
imgui.begin_child("proj_files", imgui.ImVec2(0, 150), True)
for i, pp in enumerate(self.project_paths):
is_active = (pp == self.active_project_path)
if imgui.button(f"x##p{i}"):
removed = self.project_paths.pop(i)
if removed == self.active_project_path and self.project_paths:
self._switch_project(self.project_paths[0])
break
imgui.same_line()
marker = " *" if is_active else ""
if is_active: imgui.push_style_color(imgui.Col_.text, C_IN)
if imgui.button(f"{Path(pp).stem}{marker}##ps{i}"):
self._switch_project(pp)
if is_active: imgui.pop_style_color()
imgui.same_line()
imgui.text_colored(C_LBL, pp)
imgui.end_child()
if imgui.button("Add Project"):
r = hide_tk_root()
p = filedialog.askopenfilename(
title="Select Project .toml",
filetypes=[("TOML", "*.toml"), ("All", "*.*")],
)
r.destroy()
if p and p not in self.project_paths:
self.project_paths.append(p)
imgui.same_line()
if imgui.button("New Project"):
r = hide_tk_root()
p = filedialog.asksaveasfilename(title="Create New Project .toml", defaultextension=".toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")])
r.destroy()
if p:
name = Path(p).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, p)
if p not in self.project_paths:
self.project_paths.append(p)
self._switch_project(p)
imgui.same_line()
if imgui.button("Save All"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
ch, self.ui_word_wrap = imgui.checkbox("Word-Wrap (Read-only panels)", self.ui_word_wrap)
ch, self.ui_summary_only = imgui.checkbox("Summary Only (send file structure, not full content)", self.ui_summary_only)
ch, self.ui_auto_scroll_comms = imgui.checkbox("Auto-scroll Comms History", self.ui_auto_scroll_comms)
ch, self.ui_auto_scroll_tool_calls = imgui.checkbox("Auto-scroll Tool History", self.ui_auto_scroll_tool_calls)
if imgui.collapsing_header("Agent Tools"):
for t_name in AGENT_TOOL_NAMES:
val = self.ui_agent_tools.get(t_name, True)
ch, val = imgui.checkbox(f"Enable {t_name}", val)
if ch:
self.ui_agent_tools[t_name] = val
imgui.separator()
imgui.text_colored(C_LBL, 'MMA Orchestration')
_, self.ui_epic_input = imgui.input_text_multiline('##epic_input', self.ui_epic_input, imgui.ImVec2(-1, 80))
if imgui.button('Plan Epic (Tier 1)', imgui.ImVec2(-1, 0)):
self._cb_plan_epic()
def _render_track_proposal_modal(self) -> None:
if self._show_track_proposal_modal:
imgui.open_popup("Track Proposal")
if imgui.begin_popup_modal("Track Proposal", True, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._show_track_proposal_modal:
imgui.close_current_popup()
imgui.end_popup()
return
imgui.text_colored(C_IN, "Proposed Implementation Tracks")
imgui.separator()
if not self.proposed_tracks:
imgui.text("No tracks generated.")
else:
for idx, track in enumerate(self.proposed_tracks):
# Title Edit
changed_t, new_t = imgui.input_text(f"Title##{idx}", track.get('title', ''))
if changed_t:
track['title'] = new_t
# Goal Edit
changed_g, new_g = imgui.input_text_multiline(f"Goal##{idx}", track.get('goal', ''), imgui.ImVec2(-1, 60))
if changed_g:
track['goal'] = new_g
# Buttons
if imgui.button(f"Remove##{idx}"):
self.proposed_tracks.pop(idx)
break
imgui.same_line()
if imgui.button(f"Start This Track##{idx}"):
self._cb_start_track(idx)
imgui.separator()
if imgui.button("Accept", imgui.ImVec2(120, 0)):
self._cb_accept_tracks()
self._show_track_proposal_modal = False
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Cancel", imgui.ImVec2(120, 0)):
self._show_track_proposal_modal = False
imgui.close_current_popup()
imgui.end_popup()
def _render_log_management(self) -> None:
exp, opened = imgui.begin("Log Management", self.show_windows["Log Management"])
self.show_windows["Log Management"] = bool(opened)
if not exp:
imgui.end()
return
registry = LogRegistry("logs/log_registry.toml")
sessions = registry.data
if imgui.begin_table("sessions_table", 7, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable):
imgui.table_setup_column("Session ID")
imgui.table_setup_column("Start Time")
imgui.table_setup_column("Star")
imgui.table_setup_column("Reason")
imgui.table_setup_column("Size (KB)")
imgui.table_setup_column("Msgs")
imgui.table_setup_column("Actions")
imgui.table_headers_row()
for session_id, s_data in sessions.items():
imgui.table_next_row()
imgui.table_next_column()
imgui.text(session_id)
imgui.table_next_column()
imgui.text(s_data.get("start_time", ""))
imgui.table_next_column()
whitelisted = s_data.get("whitelisted", False)
if whitelisted:
imgui.text_colored(vec4(255, 215, 0), "YES")
else:
imgui.text("NO")
metadata = s_data.get("metadata") or {}
imgui.table_next_column()
imgui.text(metadata.get("reason", ""))
imgui.table_next_column()
imgui.text(str(metadata.get("size_kb", "")))
imgui.table_next_column()
imgui.text(str(metadata.get("message_count", "")))
imgui.table_next_column()
if whitelisted:
if imgui.button(f"Unstar##{session_id}"):
registry.update_session_metadata(
session_id,
message_count=int(metadata.get("message_count") or 0),
errors=int(metadata.get("errors") or 0),
size_kb=int(metadata.get("size_kb") or 0),
whitelisted=False,
reason=str(metadata.get("reason") or "")
)
else:
if imgui.button(f"Star##{session_id}"):
registry.update_session_metadata(
session_id,
message_count=int(metadata.get("message_count") or 0),
errors=int(metadata.get("errors") or 0),
size_kb=int(metadata.get("size_kb") or 0),
whitelisted=True,
reason="Manually whitelisted"
)
imgui.end_table()
imgui.end()
def _render_files_panel(self) -> None:
imgui.text("Base Dir")
ch, self.ui_files_base_dir = imgui.input_text("##f_base", self.ui_files_base_dir)
imgui.same_line()
if imgui.button("Browse##fb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_files_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("f_paths", imgui.ImVec2(0, -40), True)
for i, f in enumerate(self.files):
if imgui.button(f"x##f{i}"):
self.files.pop(i)
break
imgui.same_line()
imgui.text(f)
imgui.end_child()
if imgui.button("Add File(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames()
r.destroy()
for p in paths:
if p not in self.files: self.files.append(p)
imgui.same_line()
if imgui.button("Add Wildcard"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.files.append(str(Path(d) / "**" / "*"))
def _render_screenshots_panel(self) -> None:
imgui.text("Base Dir")
ch, self.ui_shots_base_dir = imgui.input_text("##s_base", self.ui_shots_base_dir)
imgui.same_line()
if imgui.button("Browse##sb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_shots_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("s_paths", imgui.ImVec2(0, -40), True)
for i, s in enumerate(self.screenshots):
if imgui.button(f"x##s{i}"):
self.screenshots.pop(i)
break
imgui.same_line()
imgui.text(s)
imgui.end_child()
if imgui.button("Add Screenshot(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames(
title="Select Screenshots",
filetypes=[("Images", "*.png *.jpg *.jpeg *.gif *.bmp *.webp"), ("All", "*.*")],
)
r.destroy()
for p in paths:
if p not in self.screenshots: self.screenshots.append(p)
def _render_discussion_panel(self) -> None:
# THINKING indicator
is_thinking = self.ai_status in ["sending..."]
if is_thinking:
val = math.sin(time.time() * 10 * math.pi)
alpha = 1.0 if val > 0 else 0.0
imgui.text_colored(imgui.ImVec4(1.0, 0.39, 0.39, alpha), "THINKING...")
imgui.separator()
# Prior session viewing mode
if self.is_viewing_prior_session:
imgui.push_style_color(imgui.Col_.child_bg, vec4(50, 40, 20))
imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION")
imgui.same_line()
if imgui.button("Exit Prior Session"):
self.is_viewing_prior_session = False
self.prior_session_entries.clear()
imgui.separator()
imgui.begin_child("prior_scroll", imgui.ImVec2(0, 0), False)
for idx, entry in enumerate(self.prior_session_entries):
imgui.push_id(f"prior_{idx}")
kind = entry.get("kind", entry.get("type", ""))
imgui.text_colored(C_LBL, f"#{idx+1}")
imgui.same_line()
ts = entry.get("ts", entry.get("timestamp", ""))
if ts:
imgui.text_colored(vec4(160, 160, 160), str(ts))
imgui.same_line()
imgui.text_colored(C_KEY, str(kind))
payload = entry.get("payload", entry)
text = payload.get("text", payload.get("message", payload.get("content", "")))
if text:
preview = str(text).replace("\\n", " ")[:200]
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(preview)
imgui.pop_text_wrap_pos()
else:
imgui.text(preview)
imgui.separator()
imgui.pop_id()
imgui.end_child()
imgui.pop_style_color()
return
if not self.is_viewing_prior_session and imgui.collapsing_header("Discussions", imgui.TreeNodeFlags_.default_open):
names = self._get_discussion_names()
if imgui.begin_combo("##disc_sel", self.active_discussion):
for name in names:
is_selected = (name == self.active_discussion)
if imgui.selectable(name, is_selected)[0]:
self._switch_discussion(name)
if is_selected:
imgui.set_item_default_focus()
imgui.end_combo()
if self.active_track:
imgui.same_line()
changed, self._track_discussion_active = imgui.checkbox("Track Discussion", self._track_discussion_active)
if changed:
if self._track_discussion_active:
self._flush_disc_entries_to_project()
history_strings = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir)
with self._disc_entries_lock:
self.disc_entries = parse_history_entries(history_strings, self.disc_roles)
self.ai_status = f"track discussion: {self.active_track.id}"
else:
self._flush_disc_entries_to_project()
# Restore project discussion
self._switch_discussion(self.active_discussion)
disc_sec = self.project.get("discussion", {})
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
git_commit = disc_data.get("git_commit", "")
last_updated = disc_data.get("last_updated", "")
imgui.text_colored(C_LBL, "commit:")
imgui.same_line()
imgui.text_colored(C_IN if git_commit else C_LBL, git_commit[:12] if git_commit else "(none)")
imgui.same_line()
if imgui.button("Update Commit"):
git_dir = self.ui_project_git_dir
if git_dir:
cmt = project_manager.get_git_commit(git_dir)
if cmt:
disc_data["git_commit"] = cmt
disc_data["last_updated"] = project_manager.now_ts()
self.ai_status = f"commit: {cmt[:12]}"
imgui.text_colored(C_LBL, "updated:")
imgui.same_line()
imgui.text_colored(C_SUB, last_updated if last_updated else "(never)")
ch, self.ui_disc_new_name_input = imgui.input_text("##new_disc", self.ui_disc_new_name_input)
imgui.same_line()
if imgui.button("Create"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._create_discussion(nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Rename"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._rename_discussion(self.active_discussion, nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Delete"):
self._delete_discussion(self.active_discussion)
if not self.is_viewing_prior_session:
imgui.separator()
if imgui.button("+ Entry"):
self.disc_entries.append({"role": self.disc_roles[0] if self.disc_roles else "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
if imgui.button("-All"):
for e in self.disc_entries: e["collapsed"] = True
imgui.same_line()
if imgui.button("+All"):
for e in self.disc_entries: e["collapsed"] = False
imgui.same_line()
if imgui.button("Clear All"):
self.disc_entries.clear()
imgui.same_line()
if imgui.button("Save"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "discussion saved"
ch, self.ui_auto_add_history = imgui.checkbox("Auto-add message & response to history", self.ui_auto_add_history)
# Truncation controls
imgui.text("Keep Pairs:")
imgui.same_line()
imgui.set_next_item_width(80)
ch, self.ui_disc_truncate_pairs = imgui.input_int("##trunc_pairs", self.ui_disc_truncate_pairs, 1)
if self.ui_disc_truncate_pairs < 1: self.ui_disc_truncate_pairs = 1
imgui.same_line()
if imgui.button("Truncate"):
with self._disc_entries_lock:
self.disc_entries = truncate_entries(self.disc_entries, self.ui_disc_truncate_pairs)
self.ai_status = f"history truncated to {self.ui_disc_truncate_pairs} pairs"
imgui.separator()
if imgui.collapsing_header("Roles"):
imgui.begin_child("roles_scroll", imgui.ImVec2(0, 100), True)
for i, r in enumerate(self.disc_roles):
if imgui.button(f"x##r{i}"):
self.disc_roles.pop(i)
break
imgui.same_line()
imgui.text(r)
imgui.end_child()
ch, self.ui_disc_new_role_input = imgui.input_text("##new_role", self.ui_disc_new_role_input)
imgui.same_line()
if imgui.button("Add"):
r = self.ui_disc_new_role_input.strip()
if r and r not in self.disc_roles:
self.disc_roles.append(r)
self.ui_disc_new_role_input = ""
imgui.separator()
imgui.begin_child("disc_scroll", imgui.ImVec2(0, 0), False)
clipper = imgui.ListClipper()
clipper.begin(len(self.disc_entries))
while clipper.step():
for i in range(clipper.display_start, clipper.display_end):
entry = self.disc_entries[i]
imgui.push_id(str(i))
collapsed = entry.get("collapsed", False)
read_mode = entry.get("read_mode", False)
if imgui.button("+" if collapsed else "-"):
entry["collapsed"] = not collapsed
imgui.same_line()
imgui.set_next_item_width(120)
if imgui.begin_combo("##role", entry["role"]):
for r in self.disc_roles:
if imgui.selectable(r, r == entry["role"])[0]:
entry["role"] = r
imgui.end_combo()
if not collapsed:
imgui.same_line()
if imgui.button("[Edit]" if read_mode else "[Read]"):
entry["read_mode"] = not read_mode
ts_str = entry.get("ts", "")
if ts_str:
imgui.same_line()
imgui.text_colored(vec4(120, 120, 100), str(ts_str))
if collapsed:
imgui.same_line()
if imgui.button("Ins"):
self.disc_entries.insert(i, {"role": "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
self._render_text_viewer(f"Entry #{i+1}", entry["content"])
imgui.same_line()
if imgui.button("Del"):
self.disc_entries.pop(i)
imgui.pop_id()
break # Break from inner loop, clipper will re-step
imgui.same_line()
preview = entry["content"].replace("\\n", " ")[:60]
if len(entry["content"]) > 60: preview += "..."
imgui.text_colored(vec4(160, 160, 150), preview)
if not collapsed:
if read_mode:
imgui.begin_child("read_content", imgui.ImVec2(0, 150), True)
if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(entry["content"])
if self.ui_word_wrap: imgui.pop_text_wrap_pos()
imgui.end_child()
else:
ch, entry["content"] = imgui.input_text_multiline("##content", entry["content"], imgui.ImVec2(-1, 150))
imgui.separator()
imgui.pop_id()
if self._scroll_disc_to_bottom:
imgui.set_scroll_here_y(1.0)
self._scroll_disc_to_bottom = False
imgui.end_child()
def _render_provider_panel(self) -> None:
imgui.text("Provider")
if imgui.begin_combo("##prov", self.current_provider):
for p in PROVIDERS:
if imgui.selectable(p, p == self.current_provider)[0]:
self.current_provider = p
imgui.end_combo()
imgui.separator()
imgui.text("Model")
imgui.same_line()
if imgui.button("Fetch Models"):
self._fetch_models(self.current_provider)
if imgui.begin_list_box("##models", imgui.ImVec2(-1, 120)):
for m in self.available_models:
if imgui.selectable(m, m == self.current_model)[0]:
self.current_model = m
imgui.end_list_box()
imgui.separator()
imgui.text("Parameters")
ch, self.temperature = imgui.slider_float("Temperature", self.temperature, 0.0, 2.0, "%.2f")
ch, self.max_tokens = imgui.input_int("Max Tokens (Output)", self.max_tokens, 1024)
ch, self.history_trunc_limit = imgui.input_int("History Truncation Limit", self.history_trunc_limit, 1024)
if self.current_provider == "gemini_cli":
imgui.separator()
imgui.text("Gemini CLI")
sid = "None"
if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter:
sid = ai_client._gemini_cli_adapter.session_id or "None"
imgui.text(f"Session ID: {sid}")
if imgui.button("Reset CLI Session"):
ai_client.reset_session()
imgui.text("Binary Path")
ch, self.ui_gemini_cli_path = imgui.input_text("##gcli_path", self.ui_gemini_cli_path)
imgui.same_line()
if imgui.button("Browse##gcli"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select gemini CLI binary")
r.destroy()
if p:
self.ui_gemini_cli_path = p
if ch:
if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
imgui.separator()
imgui.text("Telemetry")
usage = self.session_usage
total = usage["input_tokens"] + usage["output_tokens"]
if total == 0 and usage.get("total_tokens", 0) > 0:
total = usage["total_tokens"]
imgui.text_colored(C_RES, f"Tokens: {total:,} (In: {usage['input_tokens']:,} Out: {usage['output_tokens']:,})")
if usage.get("last_latency", 0.0) > 0:
imgui.text_colored(C_LBL, f" Last Latency: {usage['last_latency']:.2f}s")
if usage["cache_read_input_tokens"]:
imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}")
if self._gemini_cache_text:
imgui.text_colored(C_SUB, self._gemini_cache_text)
def _render_token_budget_panel(self) -> None:
if self._token_stats_dirty:
self._token_stats_dirty = False
self._refresh_api_metrics({}, md_content=self._last_stable_md or None)
stats = self._token_stats
if not stats:
imgui.text_disabled("Token stats unavailable")
return
pct = stats.get("utilization_pct", 0.0)
current = stats.get("estimated_prompt_tokens", stats.get("total_tokens", 0))
limit = stats.get("max_prompt_tokens", 0)
headroom = stats.get("headroom_tokens", max(0, limit - current))
if pct < 50.0:
color = imgui.ImVec4(0.2, 0.8, 0.2, 1.0)
elif pct < 80.0:
color = imgui.ImVec4(1.0, 0.8, 0.0, 1.0)
else:
color = imgui.ImVec4(1.0, 0.2, 0.2, 1.0)
imgui.push_style_color(imgui.Col_.plot_histogram, color)
imgui.progress_bar(pct / 100.0, imgui.ImVec2(-1, 0), f"{pct:.1f}%")
imgui.pop_style_color()
imgui.text_disabled(f"{current:,} / {limit:,} tokens ({headroom:,} remaining)")
sys_tok = stats.get("system_tokens", 0)
tool_tok = stats.get("tools_tokens", 0)
hist_tok = stats.get("history_tokens", 0)
total_tok = sys_tok + tool_tok + hist_tok or 1
if imgui.begin_table("token_breakdown", 3, imgui.TableFlags_.borders_inner_h | imgui.TableFlags_.sizing_fixed_fit):
imgui.table_setup_column("Component")
imgui.table_setup_column("Tokens")
imgui.table_setup_column("Pct")
imgui.table_headers_row()
for lbl, tok in [("System", sys_tok), ("Tools", tool_tok), ("History", hist_tok)]:
imgui.table_next_row()
imgui.table_set_column_index(0); imgui.text(lbl)
imgui.table_set_column_index(1); imgui.text(f"{tok:,}")
imgui.table_set_column_index(2); imgui.text(f"{tok / total_tok * 100:.0f}%")
imgui.end_table()
if stats.get("would_trim"):
imgui.text_colored(imgui.ImVec4(1.0, 0.3, 0.0, 1.0), "WARNING: Next call will trim history")
trimmable = stats.get("trimmable_turns", 0)
if trimmable:
imgui.text_disabled(f"Trimmable turns: {trimmable}")
msgs = stats.get("messages")
if msgs:
shown = 0
for msg in msgs:
if shown >= 3:
break
if msg.get("trimmable"):
role = msg.get("role", "?")
toks = msg.get("tokens", 0)
imgui.text_disabled(f" [{role}] ~{toks:,} tokens")
shown += 1
imgui.separator()
if ai_client._provider == "gemini":
if ai_client._gemini_cache is not None:
age = time.time() - (ai_client._gemini_cache_created_at or time.time())
ttl = ai_client._GEMINI_CACHE_TTL
imgui.text_colored(C_LBL, f"Gemini Cache: ACTIVE | Age: {age:.0f}s / {ttl}s | Renews at: {ttl * 0.9:.0f}s")
else:
imgui.text_disabled("Gemini Cache: INACTIVE")
elif ai_client._provider == "anthropic":
with ai_client._anthropic_history_lock:
turns = len(ai_client._anthropic_history)
cache_reads = 0
for entry in reversed(ai_client.get_comms_log()):
if entry.get("kind") == "response":
cache_reads = (entry.get("payload") or {}).get("usage", {}).get("cache_read_input_tokens") or 0
break
imgui.text_disabled("Anthropic: 4-breakpoint ephemeral caching (auto-managed)")
imgui.text_disabled(f" {turns} history turns | Cache reads last call: {cache_reads:,}")
def _render_message_panel(self) -> None:
# LIVE indicator
is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
if is_live:
val = math.sin(time.time() * 10 * math.pi)
alpha = 1.0 if val > 0 else 0.0
imgui.text_colored(imgui.ImVec4(0.39, 1.0, 0.39, alpha), "LIVE")
imgui.separator()
ch, self.ui_ai_input = imgui.input_text_multiline("##ai_in", self.ui_ai_input, imgui.ImVec2(-1, -40))
# Keyboard shortcuts
io = imgui.get_io()
ctrl_enter = io.key_ctrl and imgui.is_key_pressed(imgui.Key.enter)
ctrl_l = io.key_ctrl and imgui.is_key_pressed(imgui.Key.l)
if ctrl_l:
self.ui_ai_input = ""
imgui.separator()
send_busy = False
with self._send_thread_lock:
if self.send_thread and self.send_thread.is_alive():
send_busy = True
if (imgui.button("Gen + Send") or ctrl_enter) and not send_busy:
self._handle_generate_send()
imgui.same_line()
if imgui.button("MD Only"):
self._handle_md_only()
imgui.same_line()
if imgui.button("Reset"):
self._handle_reset_session()
imgui.same_line()
if imgui.button("-> History"):
if self.ui_ai_input:
self.disc_entries.append({"role": "User", "content": self.ui_ai_input, "collapsed": False, "ts": project_manager.now_ts()})
def _render_response_panel(self) -> None:
if self._trigger_blink:
self._trigger_blink = False
self._is_blinking = True
self._blink_start_time = time.time()
try:
imgui.set_window_focus("Response") # type: ignore[call-arg]
except:
pass
is_blinking = False
if self._is_blinking:
elapsed = time.time() - self._blink_start_time
if elapsed > 1.5:
self._is_blinking = False
else:
is_blinking = True
val = math.sin(elapsed * 8 * math.pi)
alpha = 50/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 255, 0, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, alpha))
# --- Always Render Content ---
if self.ui_word_wrap:
imgui.begin_child("resp_wrap", imgui.ImVec2(-1, -40), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ai_response)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##ai_out", self.ai_response, imgui.ImVec2(-1, -40), imgui.InputTextFlags_.read_only)
imgui.separator()
if imgui.button("-> History"):
if self.ai_response:
self.disc_entries.append({"role": "AI", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
if is_blinking:
imgui.pop_style_color(2)
def _render_tool_calls_panel(self) -> None:
imgui.text("Tool call history")
imgui.same_line()
if imgui.button("Clear##tc"):
self._tool_log.clear()
imgui.separator()
imgui.begin_child("scroll_area")
clipper = imgui.ListClipper()
tool_log_filtered = self._tool_log if not self.ui_focus_agent else [
e for e in self._tool_log if e.get("source_tier") == self.ui_focus_agent
]
clipper = imgui.ListClipper()
clipper.begin(len(tool_log_filtered))
while clipper.step():
for i_minus_one in range(clipper.display_start, clipper.display_end):
i = i_minus_one + 1
entry = tool_log_filtered[i_minus_one]
script = entry["script"]
result = entry["result"]
first_line = script.split('\n')[0] if script else 'Empty Script'
imgui.text_colored(C_KEY, f"Call #{i}: {first_line}")
# Script Display imgui.text_colored(C_LBL, "Script:")
imgui.same_line()
if imgui.button(f"[+]##script_{i}"):
self.show_text_viewer = True
self.text_viewer_title = f"Call Script #{i}"
self.text_viewer_content = script
if self.ui_word_wrap:
imgui.begin_child(f"tc_script_wrap_{i}", imgui.ImVec2(-1, 72), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(script)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.begin_child(f"tc_script_fixed_width_{i}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar)
imgui.input_text_multiline(f"##tc_script_res_{i}", script, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end_child()
# Result Display
imgui.text_colored(C_LBL, "Output:")
imgui.same_line()
if imgui.button(f"[+]##output_{i}"):
self.show_text_viewer = True
self.text_viewer_title = f"Call Output #{i}"
self.text_viewer_content = result
if self.ui_word_wrap:
imgui.begin_child(f"tc_res_wrap_{i}", imgui.ImVec2(-1, 72), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(result)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.begin_child(f"tc_res_fixed_width_{i}", imgui.ImVec2(0, 72), True, imgui.WindowFlags_.horizontal_scrollbar)
imgui.input_text_multiline(f"##tc_res_val_{i}", result, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end_child()
imgui.separator()
imgui.end_child()
def _render_mma_dashboard(self) -> None:
# Task 5.3: Dense Summary Line
track_name = self.active_track.description if self.active_track else "None"
total_tickets = len(self.active_tickets)
done_tickets = sum(1 for t in self.active_tickets if t.get('status') == 'complete')
total_cost = 0.0
for stats in self.mma_tier_usage.values():
model = stats.get('model', 'unknown')
in_t = stats.get('input', 0)
out_t = stats.get('output', 0)
total_cost += cost_tracker.estimate_cost(model, in_t, out_t)
imgui.text("Track:")
imgui.same_line()
imgui.text_colored(C_VAL, track_name)
imgui.same_line()
imgui.text(" | Tickets:")
imgui.same_line()
imgui.text_colored(C_VAL, f"{done_tickets}/{total_tickets}")
imgui.same_line()
imgui.text(" | Cost:")
imgui.same_line()
imgui.text_colored(imgui.ImVec4(0, 1, 0, 1), f"${total_cost:,.4f}")
imgui.same_line()
imgui.text(" | Status:")
imgui.same_line()
status_col = imgui.ImVec4(1, 1, 1, 1)
if self.mma_status == "idle": status_col = imgui.ImVec4(0.7, 0.7, 0.7, 1)
elif self.mma_status == "running": status_col = imgui.ImVec4(1, 1, 0, 1)
elif self.mma_status == "done": status_col = imgui.ImVec4(0, 1, 0, 1)
elif self.mma_status == "error": status_col = imgui.ImVec4(1, 0, 0, 1)
imgui.text_colored(status_col, self.mma_status.upper())
imgui.separator()
# 0. Conductor Setup
if imgui.collapsing_header("Conductor Setup"):
if imgui.button("Run Setup Scan"):
self._cb_run_conductor_setup()
if self.ui_conductor_setup_summary:
imgui.input_text_multiline("##setup_summary", self.ui_conductor_setup_summary, imgui.ImVec2(-1, 120), imgui.InputTextFlags_.read_only)
imgui.separator()
# 1. Track Browser
imgui.text("Track Browser")
if imgui.begin_table("mma_tracks_table", 4, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable):
imgui.table_setup_column("Title")
imgui.table_setup_column("Status")
imgui.table_setup_column("Progress")
imgui.table_setup_column("Actions")
imgui.table_headers_row()
for track in self.tracks:
imgui.table_next_row()
imgui.table_next_column()
imgui.text(track.get("title", "Untitled"))
imgui.table_next_column()
status = track.get("status", "unknown").lower()
if status == "new":
imgui.text_colored(imgui.ImVec4(0.7, 0.7, 0.7, 1.0), "NEW")
elif status == "active":
imgui.text_colored(imgui.ImVec4(1.0, 1.0, 0.0, 1.0), "ACTIVE")
elif status == "done":
imgui.text_colored(imgui.ImVec4(0.0, 1.0, 0.0, 1.0), "DONE")
elif status == "blocked":
imgui.text_colored(imgui.ImVec4(1.0, 0.0, 0.0, 1.0), "BLOCKED")
else:
imgui.text(status)
imgui.table_next_column()
progress = track.get("progress", 0.0)
if progress < 0.33:
p_color = imgui.ImVec4(1.0, 0.0, 0.0, 1.0)
elif progress < 0.66:
p_color = imgui.ImVec4(1.0, 1.0, 0.0, 1.0)
else:
p_color = imgui.ImVec4(0.0, 1.0, 0.0, 1.0)
imgui.push_style_color(imgui.Col_.plot_histogram, p_color)
imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{int(progress*100)}%")
imgui.pop_style_color()
imgui.table_next_column()
if imgui.button(f"Load##{track.get('id')}"):
self._cb_load_track(str(track.get("id") or ""))
imgui.end_table()
# 1b. New Track Form
imgui.text("Create New Track")
changed_n, self.ui_new_track_name = imgui.input_text("Name##new_track", self.ui_new_track_name)
changed_d, self.ui_new_track_desc = imgui.input_text_multiline("Description##new_track", self.ui_new_track_desc, imgui.ImVec2(-1, 60))
imgui.text("Type:")
imgui.same_line()
if imgui.begin_combo("##track_type", self.ui_new_track_type):
for ttype in ["feature", "chore", "fix"]:
if imgui.selectable(ttype, self.ui_new_track_type == ttype)[0]:
self.ui_new_track_type = ttype
imgui.end_combo()
if imgui.button("Create Track"):
self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type)
self.ui_new_track_name = ""
self.ui_new_track_desc = ""
imgui.separator()
# 2. Global Controls
changed, self.mma_step_mode = imgui.checkbox("Step Mode (HITL)", self.mma_step_mode)
if changed:
# We could push an event here if the engine needs to know immediately
pass
imgui.same_line()
imgui.text(f"Status: {self.mma_status.upper()}")
if self.active_tier:
imgui.same_line()
imgui.text_colored(C_VAL, f"| Active: {self.active_tier}")
# Approval pending indicator
any_pending = (
self._pending_mma_spawn is not None or
self._pending_mma_approval is not None or
self._pending_ask_dialog
)
if any_pending:
alpha = abs(math.sin(time.time() * 5))
imgui.same_line()
imgui.text_colored(imgui.ImVec4(1.0, 0.3, 0.3, alpha), " APPROVAL PENDING")
imgui.same_line()
if imgui.button("Go to Approval"):
pass # scroll/focus handled by existing dialog rendering
imgui.separator()
# 2. Active Track Info
if self.active_track:
imgui.text(f"Track: {self.active_track.description}")
# Progress bar
tickets = self.active_tickets
total = len(tickets)
if total > 0:
complete = sum(1 for t in tickets if t.get('status') == 'complete')
progress = complete / total
imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{complete}/{total} Tickets")
else:
imgui.text_disabled("No active MMA track.")
# 3. Token Usage Table
imgui.separator()
imgui.text("Tier Usage (Tokens & Cost)")
if imgui.begin_table("mma_usage", 5, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg):
imgui.table_setup_column("Tier")
imgui.table_setup_column("Model")
imgui.table_setup_column("Input")
imgui.table_setup_column("Output")
imgui.table_setup_column("Est. Cost")
imgui.table_headers_row()
usage = self.mma_tier_usage
total_cost = 0.0
for tier, stats in usage.items():
imgui.table_next_row()
imgui.table_next_column()
imgui.text(tier)
imgui.table_next_column()
model = stats.get('model', 'unknown')
imgui.text(model)
imgui.table_next_column()
in_t = stats.get('input', 0)
imgui.text(f"{in_t:,}")
imgui.table_next_column()
out_t = stats.get('output', 0)
imgui.text(f"{out_t:,}")
imgui.table_next_column()
cost = cost_tracker.estimate_cost(model, in_t, out_t)
total_cost += cost
imgui.text(f"${cost:,.4f}")
# Total Row
imgui.table_next_row()
imgui.table_set_bg_color(imgui.TableBgTarget_.row_bg0, imgui.get_color_u32(imgui.Col_.plot_lines_hovered))
imgui.table_next_column()
imgui.text("TOTAL")
imgui.table_next_column()
imgui.text("")
imgui.table_next_column()
imgui.text("")
imgui.table_next_column()
imgui.text("")
imgui.table_next_column()
imgui.text(f"${total_cost:,.4f}")
imgui.end_table()
imgui.separator()
# 3b. Tier Model Config
if imgui.collapsing_header("Tier Model Config"):
for tier in self.mma_tier_usage.keys():
imgui.text(f"{tier}:")
imgui.same_line()
current_model = self.mma_tier_usage[tier].get("model", "unknown")
if imgui.begin_combo(f"##combo_{tier}", current_model):
for model in self.available_models:
if imgui.selectable(model, current_model == model)[0]:
self.mma_tier_usage[tier]["model"] = model
self.project.setdefault("mma", {}).setdefault("tier_models", {})[tier] = model
imgui.end_combo()
imgui.separator()
# 4. Task DAG Visualizer
imgui.text("Task DAG")
if self.active_track:
tickets_by_id = {str(t.get('id') or ''): t for t in self.active_tickets}
all_ids = set(tickets_by_id.keys())
# Build children map
children_map: dict[str, list[str]] = {}
for t in self.active_tickets:
for dep in t.get('depends_on', []):
if dep not in children_map: children_map[dep] = []
children_map[dep].append(str(t.get('id') or ''))
# Roots are those whose depends_on elements are NOT in all_ids
roots = []
for t in self.active_tickets:
deps = t.get('depends_on', [])
has_local_dep = any(d in all_ids for d in deps)
if not has_local_dep:
roots.append(t)
rendered: set[str] = set()
for root in roots:
self._render_ticket_dag_node(root, tickets_by_id, children_map, rendered)
# 5. Add Ticket Form
imgui.separator()
if imgui.button("Add Ticket"):
self._show_add_ticket_form = not self._show_add_ticket_form
if self._show_add_ticket_form:
# Default Ticket ID
max_id = 0
for t in self.active_tickets:
tid = t.get('id', '')
if tid.startswith('T-'):
try: max_id = max(max_id, int(tid[2:]))
except: pass
self.ui_new_ticket_id = f"T-{max_id + 1:03d}"
self.ui_new_ticket_desc = ""
self.ui_new_ticket_target = ""
self.ui_new_ticket_deps = ""
if self._show_add_ticket_form:
imgui.begin_child("add_ticket_form", imgui.ImVec2(-1, 220), True)
imgui.text_colored(C_VAL, "New Ticket Details")
_, self.ui_new_ticket_id = imgui.input_text("ID##new_ticket", self.ui_new_ticket_id)
_, self.ui_new_ticket_desc = imgui.input_text_multiline("Description##new_ticket", self.ui_new_ticket_desc, imgui.ImVec2(-1, 60))
_, self.ui_new_ticket_target = imgui.input_text("Target File##new_ticket", self.ui_new_ticket_target)
_, self.ui_new_ticket_deps = imgui.input_text("Depends On (IDs, comma-separated)##new_ticket", self.ui_new_ticket_deps)
if imgui.button("Create"):
new_ticket = {
"id": self.ui_new_ticket_id,
"description": self.ui_new_ticket_desc,
"status": "todo",
"assigned_to": "tier3-worker",
"target_file": self.ui_new_ticket_target,
"depends_on": [d.strip() for d in self.ui_new_ticket_deps.split(",") if d.strip()]
}
self.active_tickets.append(new_ticket)
self._show_add_ticket_form = False
self._push_mma_state_update()
imgui.same_line()
if imgui.button("Cancel"):
self._show_add_ticket_form = False
imgui.end_child()
else:
imgui.text_disabled("No active MMA track.")
def _render_tier_stream_panel(self, tier_key: str, stream_key: str | None) -> None:
if stream_key is not None:
content = self.mma_streams.get(stream_key, "")
imgui.begin_child(f"##stream_content_{tier_key}", imgui.ImVec2(-1, -1))
imgui.text_wrapped(content)
try:
if len(content) != self._tier_stream_last_len.get(stream_key, -1):
imgui.set_scroll_here_y(1.0)
self._tier_stream_last_len[stream_key] = len(content)
except (TypeError, AttributeError):
pass
imgui.end_child()
else:
tier3_keys = [k for k in self.mma_streams if "Tier 3" in k]
if not tier3_keys:
imgui.text_disabled("No worker output yet.")
else:
for key in tier3_keys:
ticket_id = key.split(": ", 1)[-1] if ": " in key else key
imgui.text(ticket_id)
imgui.begin_child(f"##tier3_{ticket_id}_scroll", imgui.ImVec2(-1, 150), True)
imgui.text_wrapped(self.mma_streams[key])
try:
if len(self.mma_streams[key]) != self._tier_stream_last_len.get(key, -1):
imgui.set_scroll_here_y(1.0)
self._tier_stream_last_len[key] = len(self.mma_streams[key])
except (TypeError, AttributeError):
pass
imgui.end_child()
def _render_ticket_dag_node(self, ticket: dict[str, Any], tickets_by_id: dict[str, Any], children_map: dict[str, list[str]], rendered: set[str]) -> None:
tid = ticket.get('id', '??')
is_duplicate = tid in rendered
if not is_duplicate:
rendered.add(tid)
target = ticket.get('target_file', 'general')
status = ticket.get('status', 'pending').upper()
status_color = vec4(178, 178, 178)
if status == 'RUNNING':
status_color = vec4(255, 255, 0)
elif status == 'COMPLETE':
status_color = vec4(0, 255, 0)
elif status in ['BLOCKED', 'ERROR']:
status_color = vec4(255, 0, 0)
elif status == 'PAUSED':
status_color = vec4(255, 165, 0)
p_min = imgui.get_cursor_screen_pos()
p_max = imgui.ImVec2(p_min.x + 4, p_min.y + imgui.get_text_line_height())
imgui.get_window_draw_list().add_rect_filled(p_min, p_max, imgui.get_color_u32(status_color))
imgui.set_cursor_screen_pos(imgui.ImVec2(p_min.x + 8, p_min.y))
flags = imgui.TreeNodeFlags_.open_on_arrow | imgui.TreeNodeFlags_.open_on_double_click | imgui.TreeNodeFlags_.default_open
children = children_map.get(tid, [])
if not children or is_duplicate:
flags |= imgui.TreeNodeFlags_.leaf
node_open = imgui.tree_node_ex(f"##{tid}", flags)
if imgui.is_item_hovered():
imgui.begin_tooltip()
imgui.text_colored(C_KEY, f"ID: {tid}")
imgui.text_colored(C_LBL, f"Target: {target}")
imgui.text_colored(C_LBL, "Description:")
imgui.same_line()
imgui.text_wrapped(ticket.get('description', 'N/A'))
deps = ticket.get('depends_on', [])
if deps:
imgui.text_colored(C_LBL, f"Depends on: {', '.join(deps)}")
stream_key = f"Tier 3: {tid}"
if stream_key in self.mma_streams:
imgui.separator()
imgui.text_colored(C_KEY, "Worker Stream:")
imgui.text_wrapped(self.mma_streams[stream_key])
imgui.end_tooltip()
imgui.same_line()
imgui.text_colored(C_KEY, tid)
imgui.same_line(150)
imgui.text_disabled(str(target))
imgui.same_line(400)
imgui.text_colored(status_color, status)
imgui.same_line(500)
if imgui.button(f"Retry##{tid}"):
self._cb_ticket_retry(tid)
imgui.same_line()
if imgui.button(f"Skip##{tid}"):
self._cb_ticket_skip(tid)
if status in ['TODO', 'BLOCKED']:
imgui.same_line()
if imgui.button(f"Delete##{tid}"):
self.active_tickets = [t for t in self.active_tickets if t.get('id') != tid]
for t in self.active_tickets:
deps = t.get('depends_on', [])
if tid in deps:
t['depends_on'] = [d for d in deps if d != tid]
self._push_mma_state_update()
if is_duplicate:
imgui.same_line()
imgui.text_disabled("(shown above)")
if node_open and not is_duplicate:
for child_id in children:
child = tickets_by_id.get(child_id)
if child:
self._render_ticket_dag_node(child, tickets_by_id, children_map, rendered)
imgui.tree_pop()
def _render_comms_history_panel(self) -> None:
imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}")
imgui.same_line()
if imgui.button("Clear##comms"):
ai_client.clear_comms_log()
self._comms_log.clear()
imgui.same_line()
if imgui.button("Load Log"):
self.cb_load_prior_log()
if self.is_viewing_prior_session:
imgui.same_line()
if imgui.button("Exit Prior Session"):
self.is_viewing_prior_session = False
self.prior_session_entries.clear()
self.ai_status = "idle"
imgui.separator()
imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION")
imgui.separator()
imgui.text_colored(C_OUT, "OUT")
imgui.same_line()
imgui.text_colored(C_REQ, "request")
imgui.same_line()
imgui.text_colored(C_TC, "tool_call")
imgui.same_line()
imgui.text(" ")
imgui.same_line()
imgui.text_colored(C_IN, "IN")
imgui.same_line()
imgui.text_colored(C_RES, "response")
imgui.same_line()
imgui.text_colored(C_TR, "tool_result")
imgui.separator()
# Use tinted background for prior session
if self.is_viewing_prior_session:
imgui.push_style_color(imgui.Col_.child_bg, vec4(40, 30, 20))
imgui.begin_child("comms_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar)
log_to_render = self.prior_session_entries if self.is_viewing_prior_session else list(self._comms_log)
if self.ui_focus_agent and not self.is_viewing_prior_session:
log_to_render = [e for e in log_to_render if e.get("source_tier") == self.ui_focus_agent]
for idx_minus_one, entry in enumerate(log_to_render):
idx = idx_minus_one + 1
local_ts = entry.get("local_ts", 0)
# Blink effect
blink_alpha = 0.0
if local_ts > 0 and not self.is_viewing_prior_session:
elapsed = time.time() - local_ts
if elapsed < 3.0:
blink_alpha = (1.0 - (elapsed / 3.0)) * 0.3 * (math.sin(elapsed * 10) * 0.5 + 0.5)
imgui.push_id(f"comms_{idx}")
if blink_alpha > 0:
# Draw a background highlight for the entry
imgui.get_window_draw_list()
imgui.get_cursor_screen_pos()
# Estimate height or just use a fixed height for the background
# It's better to wrap the entry in a group or just use separators
# For now, let's just use the style color push if we are sure we pop it
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, blink_alpha))
# We still need a child or a group to apply the background to
imgui.begin_group()
d = entry.get("direction", "IN")
k = entry.get("kind", "response")
imgui.text_colored(vec4(160, 160, 160), f"#{idx}")
imgui.same_line()
imgui.text_colored(vec4(160, 160, 160), entry.get("ts", "00:00:00"))
imgui.same_line()
imgui.text_colored(DIR_COLORS.get(d, C_VAL), d)
imgui.same_line()
imgui.text_colored(KIND_COLORS.get(k, C_VAL), k)
imgui.same_line()
imgui.text_colored(C_LBL, f"{entry.get('provider', '?')}/{entry.get('model', '?')}")
imgui.same_line()
tier_label = entry.get("source_tier") or "main"
imgui.text_colored(C_SUB, f"[{tier_label}]")
payload = entry.get("payload", {})
if k == "request":
self._render_heavy_text("message", payload.get("message", ""))
elif k == "response":
imgui.text_colored(C_LBL, "round:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload.get("round", "")))
imgui.text_colored(C_LBL, "stop_reason:")
imgui.same_line()
imgui.text_colored(vec4(255, 200, 120), str(payload.get("stop_reason", "")))
text = payload.get("text", "")
if text: self._render_heavy_text("text", text)
imgui.text_colored(C_LBL, "tool_calls:")
tcs = payload.get("tool_calls", [])
if not tcs: imgui.text_colored(C_VAL, " (none)")
for tc_i, tc in enumerate(tcs):
imgui.text_colored(C_KEY, f" call[{tc_i}] {tc.get('name', '?')}")
if tc.get("id"):
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(tc["id"]))
if "args" in tc or "input" in tc:
self._render_heavy_text(f"call_{tc_i}_args", str(tc.get("args") or tc.get("input")))
elif k == "tool_call":
imgui.text_colored(C_KEY, payload.get("name", "?"))
if payload.get("id"):
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
if "script" in payload: self._render_heavy_text("script", payload["script"])
if "args" in payload: self._render_heavy_text("args", str(payload["args"]))
elif k == "tool_result":
imgui.text_colored(C_KEY, payload.get("name", "?"))
if payload.get("id"):
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
if "output" in payload: self._render_heavy_text("output", payload["output"])
if "results" in payload:
for r_i, r in enumerate(payload["results"]):
imgui.text_colored(C_LBL, f" Result[{r_i}]:")
self._render_heavy_text(f"res_{r_i}", str(r))
if "usage" in payload:
u = payload["usage"]
u_str = f"In: {u.get('input_tokens', 0)} Out: {u.get('output_tokens', 0)}"
if u.get("cache_read_input_tokens"): u_str += f" (Cache: {u['cache_read_input_tokens']})"
imgui.text_colored(C_SUB, f" Usage: {u_str}")
imgui.separator()
if blink_alpha > 0:
imgui.end_group()
imgui.pop_style_color()
imgui.pop_id()
if self._scroll_comms_to_bottom:
imgui.set_scroll_here_y(1.0)
self._scroll_comms_to_bottom = False
imgui.end_child()
if self.is_viewing_prior_session:
imgui.pop_style_color()
def _render_system_prompts_panel(self) -> None:
imgui.text("Global System Prompt (all projects)")
ch, self.ui_global_system_prompt = imgui.input_text_multiline("##gsp", self.ui_global_system_prompt, imgui.ImVec2(-1, 100))
imgui.separator()
imgui.text("Project System Prompt")
ch, self.ui_project_system_prompt = imgui.input_text_multiline("##psp", self.ui_project_system_prompt, imgui.ImVec2(-1, 100))
def _render_theme_panel(self) -> None:
exp, opened = imgui.begin("Theme", self.show_windows["Theme"])
self.show_windows["Theme"] = bool(opened)
if exp:
imgui.text("Palette")
cp = theme.get_current_palette()
if imgui.begin_combo("##pal", cp):
for p in theme.get_palette_names():
if imgui.selectable(p, p == cp)[0]:
theme.apply(p)
imgui.end_combo()
imgui.separator()
imgui.text("Font")
imgui.push_item_width(-150)
ch, path = imgui.input_text("##fontp", theme.get_current_font_path())
imgui.pop_item_width()
if ch: theme._current_font_path = path
imgui.same_line()
if imgui.button("Browse##font"):
r = hide_tk_root()
p = filedialog.askopenfilename(filetypes=[("Fonts", "*.ttf *.otf"), ("All", "*.*")])
r.destroy()
if p: theme._current_font_path = p
imgui.text("Size (px)")
imgui.same_line()
imgui.push_item_width(100)
ch, size = imgui.input_float("##fonts", theme.get_current_font_size(), 1.0, 1.0, "%.0f")
if ch: theme._current_font_size = size
imgui.pop_item_width()
imgui.same_line()
if imgui.button("Apply Font (Requires Restart)"):
self._flush_to_config()
save_config(self.config)
self.ai_status = "Font settings saved. Restart required."
imgui.separator()
imgui.text("UI Scale (DPI)")
ch, scale = imgui.slider_float("##scale", theme.get_current_scale(), 0.5, 3.0, "%.2f")
if ch: theme.set_scale(scale)
imgui.end()
def _load_fonts(self) -> None:
font_path, font_size = theme.get_font_loading_params()
if font_path and Path(font_path).exists():
hello_imgui.load_font(font_path, font_size)
def _post_init(self) -> None:
theme.apply_current()
def run(self) -> None:
"""Initializes the ImGui runner and starts the main application loop."""
if "--headless" in sys.argv:
print("Headless mode active")
self._fetch_models(self.current_provider)
import uvicorn
headless_cfg = self.config.get("headless", {})
port = headless_cfg.get("port", 8000)
api = self.create_api()
uvicorn.run(api, host="0.0.0.0", port=port)
else:
theme.load_from_config(self.config)
self.runner_params = hello_imgui.RunnerParams()
self.runner_params.app_window_params.window_title = "manual slop"
self.runner_params.app_window_params.window_geometry.size = (1680, 1200)
self.runner_params.imgui_window_params.enable_viewports = False
self.runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space
self.runner_params.fps_idling.enable_idling = False
self.runner_params.imgui_window_params.show_menu_bar = True
self.runner_params.ini_folder_type = hello_imgui.IniFolderType.current_folder
self.runner_params.ini_filename = "manualslop_layout.ini"
self.runner_params.callbacks.show_gui = self._gui_func
self.runner_params.callbacks.show_menus = self._show_menus
self.runner_params.callbacks.load_additional_fonts = self._load_fonts
self.runner_params.callbacks.post_init = self._post_init
self._fetch_models(self.current_provider)
immapp.run(self.runner_params)
# On exit
self.shutdown()
session_logger.close_session()
def main() -> None:
app = App()
app.run()
if __name__ == "__main__":
main()