# gui_2.py from __future__ import annotations import tomli_w import time import math import json import sys import os from pathlib import Path from tkinter import filedialog, Tk from typing import Optional, Any from src import ai_client from src import cost_tracker from src import session_logger from src import project_manager from src import paths from src import theme_2 as theme from src import api_hooks import numpy as np from src import log_registry from src import log_pruner from src import models from src import app_controller from pydantic import BaseModel from imgui_bundle import imgui, hello_imgui, immapp, imgui_node_editor as ed PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek", "minimax"] COMMS_CLAMP_CHARS: int = 300 def hide_tk_root() -> Tk: root = Tk() root.withdraw() root.wm_attributes("-topmost", True) return root # Color Helpers def vec4(r: float, g: float, b: float, a: float = 1.0) -> imgui.ImVec4: return imgui.ImVec4(r/255, g/255, b/255, a) C_OUT: imgui.ImVec4 = vec4(100, 200, 255) C_IN: imgui.ImVec4 = vec4(140, 255, 160) C_REQ: imgui.ImVec4 = vec4(255, 220, 100) C_RES: imgui.ImVec4 = vec4(180, 255, 180) C_TC: imgui.ImVec4 = vec4(255, 180, 80) C_TR: imgui.ImVec4 = vec4(180, 220, 255) C_TRS: imgui.ImVec4 = vec4(200, 180, 255) C_LBL: imgui.ImVec4 = vec4(180, 180, 180) C_VAL: imgui.ImVec4 = vec4(220, 220, 220) C_KEY: imgui.ImVec4 = vec4(140, 200, 255) C_NUM: imgui.ImVec4 = vec4(180, 255, 180) C_SUB: imgui.ImVec4 = vec4(220, 200, 120) DIR_COLORS: dict[str, imgui.ImVec4] = {"OUT": C_OUT, "IN": C_IN} KIND_COLORS: dict[str, imgui.ImVec4] = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS} HEAVY_KEYS: set[str] = {"message", "text", "script", "output", "content"} def truncate_entries(entries: list[dict[str, Any]], max_pairs: int) -> list[dict[str, Any]]: if max_pairs <= 0: return [] count = 0 target = max_pairs * 2 for i in range(len(entries) - 1, -1, -1): role = entries[i].get("role", "") if role in ("User", "AI"): count += 1 if count == target: return entries[i:] return entries class GenerateRequest(BaseModel): prompt: str auto_add_history: bool = True temperature: float | None = None max_tokens: int | None = None class ConfirmRequest(BaseModel): approved: bool script: Optional[str] = None class App: """The main ImGui interface orchestrator for Manual Slop.""" def __init__(self) -> None: # Initialize controller and delegate state self.controller = app_controller.AppController() # Restore legacy PROVIDERS to controller if needed (it already has it via delegation if set on class level, but let's be explicit) if not hasattr(self.controller, 'PROVIDERS'): self.controller.PROVIDERS = PROVIDERS self.controller.init_state() self.controller.start_services(self) # Aliases for controller-owned locks self._send_thread_lock = self.controller._send_thread_lock self._disc_entries_lock = self.controller._disc_entries_lock self._pending_comms_lock = self.controller._pending_comms_lock self._pending_tool_calls_lock = self.controller._pending_tool_calls_lock self._pending_history_adds_lock = self.controller._pending_history_adds_lock self._pending_gui_tasks_lock = self.controller._pending_gui_tasks_lock self._pending_dialog_lock = self.controller._pending_dialog_lock self._api_event_queue_lock = self.controller._api_event_queue_lock self._discussion_names_cache: list[str] = [] self._discussion_names_dirty: bool = True # Initialize node editor context self.node_editor_config = ed.Config() self.node_editor_ctx = ed.create_editor(self.node_editor_config) self.ui_selected_ticket_id: Optional[str] = None self._autofocus_response_tab = False gui_cfg = self.config.get("gui", {}) self.ui_separate_message_panel = gui_cfg.get("separate_message_panel", False) self.ui_separate_response_panel = gui_cfg.get("separate_response_panel", False) self.ui_separate_tool_calls_panel = gui_cfg.get("separate_tool_calls_panel", False) self._comms_log_cache: list[dict[str, Any]] = [] self._comms_log_dirty: bool = True self._tool_log_cache: list[dict[str, Any]] = [] self._tool_log_dirty: bool = True self._last_ui_focus_agent: Optional[str] = None self._log_registry: Optional[log_registry.LogRegistry] = None def _handle_approve_tool(self, user_data=None) -> None: """UI-level wrapper for approving a pending tool execution ask.""" self._handle_approve_ask() def _handle_approve_mma_step(self, user_data=None) -> None: """UI-level wrapper for approving a pending MMA step.""" self._handle_mma_respond(approved=True) def _handle_approve_spawn(self, user_data=None) -> None: """UI-level wrapper for approving a pending MMA sub-agent spawn.""" self._handle_mma_respond(approved=True) def __getattr__(self, name: str) -> Any: if name != 'controller' and hasattr(self, 'controller') and hasattr(self.controller, name): return getattr(self.controller, name) raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def __setattr__(self, name: str, value: Any) -> None: if name == 'controller': super().__setattr__(name, value) elif hasattr(self, 'controller') and hasattr(self.controller, name): setattr(self.controller, name, value) else: super().__setattr__(name, value) @property def current_provider(self) -> str: return self.controller.current_provider @current_provider.setter def current_provider(self, value: str) -> None: self.controller.current_provider = value @property def current_model(self) -> str: return self.controller.current_model @current_model.setter def current_model(self, value: str) -> None: self.controller.current_model = value # ---------------------------------------------------------------- project loading # ---------------------------------------------------------------- logic def shutdown(self) -> None: """Cleanly shuts down the app's background tasks and saves state.""" try: if hasattr(self, 'runner_params') and self.runner_params.ini_filename: imgui.save_ini_settings_to_disk(self.runner_params.ini_filename) except: pass self.controller.shutdown() def _test_callback_func_write_to_file(self, data: str) -> None: """A dummy function that a custom_callback would execute for testing.""" # Ensure the directory exists if running from a different cwd os.makedirs("tests/artifacts", exist_ok=True) with open("tests/artifacts/temp_callback_output.txt", "w") as f: f.write(data) # ---------------------------------------------------------------- helpers def _render_text_viewer(self, label: str, content: str) -> None: if imgui.button("[+]##" + str(id(content))): self.show_text_viewer = True self.text_viewer_title = label self.text_viewer_content = content def _render_heavy_text(self, label: str, content: str) -> None: imgui.text_colored(C_LBL, f"{label}:") imgui.same_line() if imgui.button("[+]##" + label): self.show_text_viewer = True self.text_viewer_title = label self.text_viewer_content = content if not content: imgui.text_disabled("(empty)") return if len(content) > COMMS_CLAMP_CHARS: # Use a fixed-height multi-line input box for large text to avoid expensive frame-by-frame wrapping imgui.begin_child(f"heavy_text_child_{label}_{hash(content)}", imgui.ImVec2(0, 80), True) imgui.input_text_multiline(f"##{label}_input", content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end_child() else: if self.ui_word_wrap: imgui.push_text_wrap_pos(0.0) imgui.text_unformatted(content) imgui.pop_text_wrap_pos() else: imgui.text_unformatted(content) # ---------------------------------------------------------------- gui def _show_menus(self) -> None: if imgui.begin_menu("manual slop"): if imgui.menu_item("Quit", "Ctrl+Q", False)[0]: self.runner_params.app_shall_exit = True imgui.end_menu() if imgui.begin_menu("Windows"): for w in self.show_windows.keys(): _, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w]) imgui.end_menu() if imgui.begin_menu("Project"): if imgui.menu_item("Save All", "", False)[0]: self._flush_to_project() self._save_active_project() self._flush_to_config() models.save_config(self.config) self.ai_status = "config saved" if imgui.menu_item("Reset Session", "", False)[0]: ai_client.reset_session() ai_client.clear_comms_log() self._tool_log.clear() self._comms_log.clear() self.ai_status = "session reset" self.ai_response = "" if imgui.menu_item("Generate MD Only", "", False)[0]: try: md, path, *_ = self._do_generate() self.last_md = md self.last_md_path = path self.ai_status = f"md written: {path.name}" except Exception as e: self.ai_status = f"error: {e}" imgui.end_menu() def _gui_func(self) -> None: try: self.perf_monitor.start_frame() self._autofocus_response_tab = self.controller._autofocus_response_tab # Process GUI task queue # DEBUG: Check if tasks exist before processing if hasattr(self, 'controller') and hasattr(self.controller, '_pending_gui_tasks'): pending_count = len(self.controller._pending_gui_tasks) if pending_count > 0: sys.stderr.write(f"[DEBUG gui_2] _gui_func: found {pending_count} pending tasks\n") sys.stderr.flush() self._process_pending_gui_tasks() self._process_pending_history_adds() self._render_track_proposal_modal() self._render_patch_modal() # Auto-save (every 60s) now = time.time() if now - self._last_autosave >= self._autosave_interval: self._last_autosave = now try: self._flush_to_project() self._save_active_project() self._flush_to_config() models.save_config(self.config) except Exception: pass # silent — don't disrupt the GUI loop # Sync pending comms with self._pending_comms_lock: if self._pending_comms: if self.ui_auto_scroll_comms: self._scroll_comms_to_bottom = True self._comms_log_dirty = True for c in self._pending_comms: self._comms_log.append(c) self._pending_comms.clear() if self.ui_focus_agent != self._last_ui_focus_agent: self._comms_log_dirty = True self._tool_log_dirty = True self._last_ui_focus_agent = self.ui_focus_agent if self._comms_log_dirty: if self.is_viewing_prior_session: self._comms_log_cache = self.prior_session_entries else: log_raw = list(self._comms_log) if self.ui_focus_agent: self._comms_log_cache = [e for e in log_raw if e.get("source_tier") == self.ui_focus_agent] else: self._comms_log_cache = log_raw self._comms_log_dirty = False if self._tool_log_dirty: log_raw = list(self._tool_log) if self.ui_focus_agent: self._tool_log_cache = [e for e in log_raw if e.get("source_tier") == self.ui_focus_agent] else: self._tool_log_cache = log_raw self._tool_log_dirty = False with self._pending_tool_calls_lock: if self._pending_tool_calls: if self.ui_auto_scroll_tool_calls: self._scroll_tool_calls_to_bottom = True self._tool_log_dirty = True for tc in self._pending_tool_calls: self._tool_log.append(tc) self._pending_tool_calls.clear() if self.show_windows.get("Context Hub", False): exp, opened = imgui.begin("Context Hub", self.show_windows["Context Hub"]) self.show_windows["Context Hub"] = bool(opened) if exp: self._render_projects_panel() imgui.end() if self.show_windows.get("Files & Media", False): exp, opened = imgui.begin("Files & Media", self.show_windows["Files & Media"]) self.show_windows["Files & Media"] = bool(opened) if exp: if imgui.collapsing_header("Files"): self._render_files_panel() if imgui.collapsing_header("Screenshots"): self._render_screenshots_panel() imgui.end() if self.show_windows.get("AI Settings", False): exp, opened = imgui.begin("AI Settings", self.show_windows["AI Settings"]) self.show_windows["AI Settings"] = bool(opened) if exp: if imgui.collapsing_header("Provider & Model"): self._render_provider_panel() if imgui.collapsing_header("Token Budget"): self._render_token_budget_panel() if imgui.collapsing_header("System Prompts"): self._render_system_prompts_panel() imgui.end() if self.show_windows.get("MMA Dashboard", False): exp, opened = imgui.begin("MMA Dashboard", self.show_windows["MMA Dashboard"]) self.show_windows["MMA Dashboard"] = bool(opened) if exp: self._render_mma_dashboard() imgui.end() if self.show_windows.get("Tier 1: Strategy", False): exp, opened = imgui.begin("Tier 1: Strategy", self.show_windows["Tier 1: Strategy"]) self.show_windows["Tier 1: Strategy"] = bool(opened) if exp: self._render_tier_stream_panel("Tier 1", "Tier 1") imgui.end() if self.show_windows.get("Tier 2: Tech Lead", False): exp, opened = imgui.begin("Tier 2: Tech Lead", self.show_windows["Tier 2: Tech Lead"]) self.show_windows["Tier 2: Tech Lead"] = bool(opened) if exp: self._render_tier_stream_panel("Tier 2", "Tier 2 (Tech Lead)") imgui.end() if self.show_windows.get("Tier 3: Workers", False): exp, opened = imgui.begin("Tier 3: Workers", self.show_windows["Tier 3: Workers"]) self.show_windows["Tier 3: Workers"] = bool(opened) if exp: self._render_tier_stream_panel("Tier 3", None) imgui.end() if self.show_windows.get("Tier 4: QA", False): exp, opened = imgui.begin("Tier 4: QA", self.show_windows["Tier 4: QA"]) self.show_windows["Tier 4: QA"] = bool(opened) if exp: self._render_tier_stream_panel("Tier 4", "Tier 4 (QA)") imgui.end() if self.show_windows.get("Theme", False): self._render_theme_panel() if self.show_windows.get("Discussion Hub", False): exp, opened = imgui.begin("Discussion Hub", self.show_windows["Discussion Hub"]) self.show_windows["Discussion Hub"] = bool(opened) if exp: # Top part for the history imgui.begin_child("HistoryChild", size=(0, -200)) self._render_discussion_panel() imgui.end_child() # Bottom part with tabs for message and response # Detach controls imgui.push_style_var(imgui.StyleVar_.item_spacing, imgui.ImVec2(10, 4)) ch1, self.ui_separate_message_panel = imgui.checkbox("Pop Out Message", self.ui_separate_message_panel) imgui.same_line() ch2, self.ui_separate_response_panel = imgui.checkbox("Pop Out Response", self.ui_separate_response_panel) if ch1: self.show_windows["Message"] = self.ui_separate_message_panel if ch2: self.show_windows["Response"] = self.ui_separate_response_panel imgui.pop_style_var() show_message_tab = not self.ui_separate_message_panel show_response_tab = not self.ui_separate_response_panel if show_message_tab or show_response_tab: if imgui.begin_tab_bar("discussion_tabs"): # Task: Auto-focus Response tab when response received tab_flags = imgui.TabItemFlags_.none if self._autofocus_response_tab: tab_flags = imgui.TabItemFlags_.set_selected self._autofocus_response_tab = False self.controller._autofocus_response_tab = False if show_message_tab: if imgui.begin_tab_item("Message", None)[0]: self._render_message_panel() imgui.end_tab_item() if show_response_tab: if imgui.begin_tab_item("Response", None, tab_flags)[0]: self._render_response_panel() imgui.end_tab_item() imgui.end_tab_bar() else: imgui.text_disabled("Message & Response panels are detached.") imgui.end() if self.show_windows.get("Operations Hub", False): exp, opened = imgui.begin("Operations Hub", self.show_windows["Operations Hub"]) self.show_windows["Operations Hub"] = bool(opened) if exp: imgui.text("Focus Agent:") imgui.same_line() focus_label = self.ui_focus_agent or "All" if imgui.begin_combo("##focus_agent", focus_label, imgui.ComboFlags_.width_fit_preview): if imgui.selectable("All", self.ui_focus_agent is None)[0]: self.ui_focus_agent = None for tier in ["Tier 2", "Tier 3", "Tier 4"]: if imgui.selectable(tier, self.ui_focus_agent == tier)[0]: self.ui_focus_agent = tier imgui.end_combo() imgui.same_line() if self.ui_focus_agent: if imgui.button("x##clear_focus"): self.ui_focus_agent = None if exp: imgui.push_style_var(imgui.StyleVar_.item_spacing, imgui.ImVec2(10, 4)) ch, self.ui_separate_tool_calls_panel = imgui.checkbox("Pop Out Tool Calls", self.ui_separate_tool_calls_panel) if ch: self.show_windows["Tool Calls"] = self.ui_separate_tool_calls_panel imgui.pop_style_var() show_tc_tab = not self.ui_separate_tool_calls_panel if imgui.begin_tab_bar("ops_tabs"): if imgui.begin_tab_item("Comms History")[0]: self._render_comms_history_panel() imgui.end_tab_item() if show_tc_tab: if imgui.begin_tab_item("Tool Calls")[0]: self._render_tool_calls_panel() imgui.end_tab_item() imgui.end_tab_bar() imgui.end() if self.ui_separate_message_panel and self.show_windows.get("Message", False): exp, opened = imgui.begin("Message", self.show_windows["Message"]) self.show_windows["Message"] = bool(opened) if exp: self._render_message_panel() imgui.end() if self.ui_separate_response_panel and self.show_windows.get("Response", False): exp, opened = imgui.begin("Response", self.show_windows["Response"]) self.show_windows["Response"] = bool(opened) if exp: self._render_response_panel() imgui.end() if self.ui_separate_tool_calls_panel and self.show_windows.get("Tool Calls", False): exp, opened = imgui.begin("Tool Calls", self.show_windows["Tool Calls"]) self.show_windows["Tool Calls"] = bool(opened) if exp: self._render_tool_calls_panel() imgui.end() if self.show_windows.get("Log Management", False): self._render_log_management() if self.show_windows["Diagnostics"]: exp, opened = imgui.begin("Diagnostics", self.show_windows["Diagnostics"]) self.show_windows["Diagnostics"] = bool(opened) if exp: now = time.time() if now - self._perf_last_update >= 0.5: self._perf_last_update = now metrics = self.perf_monitor.get_metrics() self.perf_history["frame_time"].pop(0) self.perf_history["frame_time"].append(metrics.get("last_frame_time_ms", 0.0)) self.perf_history["fps"].pop(0) self.perf_history["fps"].append(metrics.get("fps", 0.0)) self.perf_history["cpu"].pop(0) self.perf_history["cpu"].append(metrics.get("cpu_percent", 0.0)) self.perf_history["input_lag"].pop(0) self.perf_history["input_lag"].append(metrics.get("input_lag_ms", 0.0)) metrics = self.perf_monitor.get_metrics() imgui.text("Performance Telemetry") imgui.separator() if imgui.begin_table("perf_table", 2, imgui.TableFlags_.borders_inner_h): imgui.table_setup_column("Metric") imgui.table_setup_column("Value") imgui.table_headers_row() imgui.table_next_row() imgui.table_next_column() imgui.text("FPS") imgui.table_next_column() imgui.text(f"{metrics.get('fps', 0.0):.1f}") imgui.table_next_row() imgui.table_next_column() imgui.text("Frame Time (ms)") imgui.table_next_column() imgui.text(f"{metrics.get('last_frame_time_ms', 0.0):.2f}") imgui.table_next_row() imgui.table_next_column() imgui.text("CPU %") imgui.table_next_column() imgui.text(f"{metrics.get('cpu_percent', 0.0):.1f}") imgui.table_next_row() imgui.table_next_column() imgui.text("Input Lag (ms)") imgui.table_next_column() imgui.text(f"{metrics.get('input_lag_ms', 0.0):.1f}") imgui.end_table() imgui.separator() imgui.text("Frame Time (ms)") imgui.plot_lines("##ft_plot", np.array(self.perf_history["frame_time"], dtype=np.float32), overlay_text="frame_time", graph_size=imgui.ImVec2(-1, 60)) imgui.text("CPU %") imgui.plot_lines("##cpu_plot", np.array(self.perf_history["cpu"], dtype=np.float32), overlay_text="cpu", graph_size=imgui.ImVec2(-1, 60)) imgui.end() self.perf_monitor.end_frame() # ---- Modals / Popups with self._pending_dialog_lock: dlg = self._pending_dialog if dlg: if not self._pending_dialog_open: imgui.open_popup("Approve PowerShell Command") self._pending_dialog_open = True else: self._pending_dialog_open = False if imgui.begin_popup_modal("Approve PowerShell Command", None, imgui.WindowFlags_.always_auto_resize)[0]: if not dlg: imgui.close_current_popup() else: imgui.text("The AI wants to run the following PowerShell script:") imgui.text_colored(vec4(200, 200, 100), f"base_dir: {dlg._base_dir}") imgui.separator() # Checkbox to toggle full preview inside modal _, self.show_text_viewer = imgui.checkbox("Show Full Preview", self.show_text_viewer) if self.show_text_viewer: imgui.begin_child("preview_child", imgui.ImVec2(600, 300), True) imgui.text_unformatted(dlg._script) imgui.end_child() else: ch, dlg._script = imgui.input_text_multiline("##confirm_script", dlg._script, imgui.ImVec2(-1, 200)) imgui.separator() if imgui.button("Approve & Run", imgui.ImVec2(120, 0)): with dlg._condition: dlg._approved = True dlg._done = True dlg._condition.notify_all() with self._pending_dialog_lock: self._pending_dialog = None imgui.close_current_popup() imgui.same_line() if imgui.button("Reject", imgui.ImVec2(120, 0)): with dlg._condition: dlg._approved = False dlg._done = True dlg._condition.notify_all() with self._pending_dialog_lock: self._pending_dialog = None imgui.close_current_popup() imgui.end_popup() if self._pending_ask_dialog: if not self._ask_dialog_open: imgui.open_popup("Approve Tool Execution") self._ask_dialog_open = True else: self._ask_dialog_open = False if imgui.begin_popup_modal("Approve Tool Execution", None, imgui.WindowFlags_.always_auto_resize)[0]: if not self._pending_ask_dialog or self._ask_tool_data is None: imgui.close_current_popup() else: tool_name = self._ask_tool_data.get("tool", "unknown") tool_args = self._ask_tool_data.get("args", {}) imgui.text("The AI wants to execute a tool:") imgui.text_colored(vec4(200, 200, 100), f"Tool: {tool_name}") imgui.separator() imgui.text("Arguments:") imgui.begin_child("ask_args_child", imgui.ImVec2(400, 200), True) imgui.text_unformatted(json.dumps(tool_args, indent=2)) imgui.end_child() imgui.separator() if imgui.button("Approve", imgui.ImVec2(120, 0)): self._handle_approve_ask() imgui.close_current_popup() imgui.same_line() if imgui.button("Deny", imgui.ImVec2(120, 0)): self._handle_reject_ask() imgui.close_current_popup() imgui.end_popup() # MMA Step Approval Modal if self._pending_mma_approval: if not self._mma_approval_open: imgui.open_popup("MMA Step Approval") self._mma_approval_open = True self._mma_approval_edit_mode = False self._mma_approval_payload = self._pending_mma_approval.get("payload", "") else: self._mma_approval_open = False if imgui.begin_popup_modal("MMA Step Approval", None, imgui.WindowFlags_.always_auto_resize)[0]: if not self._pending_mma_approval: imgui.close_current_popup() else: ticket_id = self._pending_mma_approval.get("ticket_id", "??") imgui.text(f"Ticket {ticket_id} is waiting for tool execution approval.") imgui.separator() if self._mma_approval_edit_mode: imgui.text("Edit Raw Payload (Manual Memory Mutation):") _, self._mma_approval_payload = imgui.input_text_multiline("##mma_payload", self._mma_approval_payload, imgui.ImVec2(600, 400)) else: imgui.text("Proposed Tool Call:") imgui.begin_child("mma_preview", imgui.ImVec2(600, 300), True) imgui.text_unformatted(str(self._pending_mma_approval.get("payload", ""))) imgui.end_child() imgui.separator() if imgui.button("Approve", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=True, payload=self._mma_approval_payload) imgui.close_current_popup() imgui.same_line() if imgui.button("Edit Payload" if not self._mma_approval_edit_mode else "Show Original", imgui.ImVec2(120, 0)): self._mma_approval_edit_mode = not self._mma_approval_edit_mode imgui.same_line() if imgui.button("Abort Ticket", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=False) imgui.close_current_popup() imgui.end_popup() # MMA Spawn Approval Modal if self._pending_mma_spawn: if not self._mma_spawn_open: imgui.open_popup("MMA Spawn Approval") self._mma_spawn_open = True self._mma_spawn_edit_mode = False self._mma_spawn_prompt = self._pending_mma_spawn.get("prompt", "") self._mma_spawn_context = self._pending_mma_spawn.get("context_md", "") else: self._mma_spawn_open = False if imgui.begin_popup_modal("MMA Spawn Approval", None, imgui.WindowFlags_.always_auto_resize)[0]: if not self._pending_mma_spawn: imgui.close_current_popup() else: role = self._pending_mma_spawn.get("role", "??") ticket_id = self._pending_mma_spawn.get("ticket_id", "??") imgui.text(f"Spawning {role} for Ticket {ticket_id}") imgui.separator() if self._mma_spawn_edit_mode: imgui.text("Edit Prompt:") _, self._mma_spawn_prompt = imgui.input_text_multiline("##spawn_prompt", self._mma_spawn_prompt, imgui.ImVec2(800, 200)) imgui.text("Edit Context MD:") _, self._mma_spawn_context = imgui.input_text_multiline("##spawn_context", self._mma_spawn_context, imgui.ImVec2(800, 300)) else: imgui.text("Proposed Prompt:") imgui.begin_child("spawn_prompt_preview", imgui.ImVec2(800, 150), True) imgui.text_unformatted(self._mma_spawn_prompt) imgui.end_child() imgui.text("Proposed Context MD:") imgui.begin_child("spawn_context_preview", imgui.ImVec2(800, 250), True) imgui.text_unformatted(self._mma_spawn_context) imgui.end_child() imgui.separator() if imgui.button("Approve", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=True, prompt=self._mma_spawn_prompt, context_md=self._mma_spawn_context) imgui.close_current_popup() imgui.same_line() if imgui.button("Edit Mode" if not self._mma_spawn_edit_mode else "Preview Mode", imgui.ImVec2(120, 0)): self._mma_spawn_edit_mode = not self._mma_spawn_edit_mode imgui.same_line() if imgui.button("Abort", imgui.ImVec2(120, 0)): self._handle_mma_respond(approved=False, abort=True) imgui.close_current_popup() imgui.end_popup() # Cycle Detected Popup if imgui.begin_popup_modal("Cycle Detected!", None, imgui.WindowFlags_.always_auto_resize)[0]: imgui.text_colored(imgui.ImVec4(1, 0.3, 0.3, 1), "The dependency graph contains a cycle!") imgui.text("Please remove the circular dependency.") if imgui.button("OK"): imgui.close_current_popup() imgui.end_popup() if self.show_script_output: if self._trigger_script_blink: self._trigger_script_blink = False self._is_script_blinking = True self._script_blink_start_time = time.time() try: imgui.set_window_focus("Last Script Output") # type: ignore[call-arg] except Exception: pass if self._is_script_blinking: elapsed = time.time() - self._script_blink_start_time if elapsed > 1.5: self._is_script_blinking = False else: val = math.sin(elapsed * 8 * math.pi) alpha = 60/255 if val > 0 else 0 imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 100, 255, alpha)) imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 100, 255, alpha)) imgui.set_next_window_size(imgui.ImVec2(800, 600), imgui.Cond_.first_use_ever) expanded, opened = imgui.begin("Last Script Output", self.show_script_output) self.show_script_output = bool(opened) if expanded: imgui.text("Script:") imgui.same_line() self._render_text_viewer("Last Script", self.ui_last_script_text) if self.ui_word_wrap: imgui.begin_child("lso_s_wrap", imgui.ImVec2(-1, 200), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.ui_last_script_text) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##lso_s", self.ui_last_script_text, imgui.ImVec2(-1, 200), imgui.InputTextFlags_.read_only) imgui.separator() imgui.text("Output:") imgui.same_line() self._render_text_viewer("Last Output", self.ui_last_script_output) if self.ui_word_wrap: imgui.begin_child("lso_o_wrap", imgui.ImVec2(-1, -1), True) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.ui_last_script_output) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##lso_o", self.ui_last_script_output, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) if self._is_script_blinking: imgui.pop_style_color(2) imgui.end() if self.show_text_viewer: imgui.set_next_window_size(imgui.ImVec2(900, 700), imgui.Cond_.first_use_ever) expanded, opened = imgui.begin(f"Text Viewer - {self.text_viewer_title}", self.show_text_viewer) self.show_text_viewer = bool(opened) if expanded: if self.ui_word_wrap: imgui.begin_child("tv_wrap", imgui.ImVec2(-1, -1), False) imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(self.text_viewer_content) imgui.pop_text_wrap_pos() imgui.end_child() else: imgui.input_text_multiline("##tv_c", self.text_viewer_content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end() except Exception as e: print(f"ERROR in _gui_func: {e}") import traceback traceback.print_exc() def _render_projects_panel(self) -> None: proj_name = self.project.get("project", {}).get("name", Path(self.active_project_path).stem) imgui.text_colored(C_IN, f"Active: {proj_name}") imgui.separator() imgui.text("Git Directory") ch, self.ui_project_git_dir = imgui.input_text("##git_dir", self.ui_project_git_dir) imgui.same_line() if imgui.button("Browse##git"): r = hide_tk_root() d = filedialog.askdirectory(title="Select Git Directory") r.destroy() if d: self.ui_project_git_dir = d imgui.separator() imgui.text("Main Context File") ch, self.ui_project_main_context = imgui.input_text("##main_ctx", self.ui_project_main_context) imgui.same_line() if imgui.button("Browse##ctx"): r = hide_tk_root() p = filedialog.askopenfilename(title="Select Main Context File") r.destroy() if p: self.ui_project_main_context = p imgui.separator() imgui.text("Output Dir") ch, self.ui_output_dir = imgui.input_text("##out_dir", self.ui_output_dir) imgui.same_line() if imgui.button("Browse##out"): r = hide_tk_root() d = filedialog.askdirectory(title="Select Output Dir") r.destroy() if d: self.ui_output_dir = d imgui.separator() imgui.text("Project Files") imgui.begin_child("proj_files", imgui.ImVec2(0, 150), True) for i, pp in enumerate(self.project_paths): is_active = (pp == self.active_project_path) if imgui.button(f"x##p{i}"): removed = self.project_paths.pop(i) if removed == self.active_project_path and self.project_paths: self._switch_project(self.project_paths[0]) break imgui.same_line() marker = " *" if is_active else "" if is_active: imgui.push_style_color(imgui.Col_.text, C_IN) if imgui.button(f"{Path(pp).stem}{marker}##ps{i}"): self._switch_project(pp) if is_active: imgui.pop_style_color() imgui.same_line() imgui.text_colored(C_LBL, pp) imgui.end_child() if imgui.button("Add Project"): r = hide_tk_root() p = filedialog.askopenfilename( title="Select Project .toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")], ) r.destroy() if p and p not in self.project_paths: self.project_paths.append(p) imgui.same_line() if imgui.button("New Project"): r = hide_tk_root() p = filedialog.asksaveasfilename(title="Create New Project .toml", defaultextension=".toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")]) r.destroy() if p: name = Path(p).stem proj = project_manager.default_project(name) project_manager.save_project(proj, p) if p not in self.project_paths: self.project_paths.append(p) self._switch_project(p) imgui.same_line() if imgui.button("Save All"): self._flush_to_project() self._save_active_project() self._flush_to_config() models.save_config(self.config) self.ai_status = "config saved" ch, self.ui_word_wrap = imgui.checkbox("Word-Wrap (Read-only panels)", self.ui_word_wrap) ch, self.ui_summary_only = imgui.checkbox("Summary Only (send file structure, not full content)", self.ui_summary_only) ch, self.ui_auto_scroll_comms = imgui.checkbox("Auto-scroll Comms History", self.ui_auto_scroll_comms) ch, self.ui_auto_scroll_tool_calls = imgui.checkbox("Auto-scroll Tool History", self.ui_auto_scroll_tool_calls) if imgui.collapsing_header("Agent Tools"): for t_name in models.AGENT_TOOL_NAMES: val = self.ui_agent_tools.get(t_name, True) ch, val = imgui.checkbox(f"Enable {t_name}", val) if ch: self.ui_agent_tools[t_name] = val def _render_track_proposal_modal(self) -> None: if self._show_track_proposal_modal: imgui.open_popup("Track Proposal") if imgui.begin_popup_modal("Track Proposal", True, imgui.WindowFlags_.always_auto_resize)[0]: if not self._show_track_proposal_modal: imgui.close_current_popup() imgui.end_popup() return imgui.text_colored(C_IN, "Proposed Implementation Tracks") imgui.separator() if not self.proposed_tracks: imgui.text("No tracks generated.") else: for idx, track in enumerate(self.proposed_tracks): # Title Edit changed_t, new_t = imgui.input_text(f"Title##{idx}", track.get('title', '')) if changed_t: track['title'] = new_t # Goal Edit changed_g, new_g = imgui.input_text_multiline(f"Goal##{idx}", track.get('goal', ''), imgui.ImVec2(-1, 60)) if changed_g: track['goal'] = new_g # Buttons if imgui.button(f"Remove##{idx}"): self.proposed_tracks.pop(idx) break imgui.same_line() if imgui.button(f"Start This Track##{idx}"): self._cb_start_track(idx) imgui.separator() if imgui.button("Accept", imgui.ImVec2(120, 0)): self._cb_accept_tracks() self._show_track_proposal_modal = False imgui.close_current_popup() imgui.same_line() if imgui.button("Cancel", imgui.ImVec2(120, 0)): self._show_track_proposal_modal = False imgui.close_current_popup() imgui.end_popup() def _render_patch_modal(self) -> None: if not self._show_patch_modal: return imgui.open_popup("Apply Patch?") if imgui.begin_popup_modal("Apply Patch?", True, imgui.ImVec2(600, 500))[0]: imgui.text_colored(imgui.ImVec4(1, 0.9, 0.3, 1), "Tier 4 QA Generated a Patch") imgui.separator() if self._pending_patch_files: imgui.text("Files to modify:") for f in self._pending_patch_files: imgui.text(f" - {f}") imgui.separator() if self._patch_error_message: imgui.text_colored(imgui.ImVec4(1, 0.3, 0.3, 1), f"Error: {self._patch_error_message}") imgui.separator() imgui.text("Diff Preview:") imgui.begin_child("patch_diff_scroll", imgui.ImVec2(-1, 280), True) if self._pending_patch_text: diff_lines = self._pending_patch_text.split("\n") for line in diff_lines: if line.startswith("+++") or line.startswith("---") or line.startswith("@@"): imgui.text_colored(imgui.ImVec4(0.3, 0.7, 1, 1), line) elif line.startswith("+"): imgui.text_colored(imgui.ImVec4(0.2, 0.9, 0.2, 1), line) elif line.startswith("-"): imgui.text_colored(imgui.ImVec4(0.9, 0.2, 0.2, 1), line) else: imgui.text(line) imgui.end_child() imgui.separator() if imgui.button("Apply Patch", imgui.ImVec2(150, 0)): self._apply_pending_patch() imgui.same_line() if imgui.button("Reject", imgui.ImVec2(150, 0)): self._show_patch_modal = False self._pending_patch_text = None self._pending_patch_files = [] self._patch_error_message = None imgui.close_current_popup() imgui.end_popup() def _apply_pending_patch(self) -> None: if not self._pending_patch_text: self._patch_error_message = "No patch to apply" return try: from src.diff_viewer import apply_patch_to_file base_dir = str(self.controller.current_project_dir) if hasattr(self.controller, 'current_project_dir') else "." success, msg = apply_patch_to_file(self._pending_patch_text, base_dir) if success: self._show_patch_modal = False self._pending_patch_text = None self._pending_patch_files = [] self._patch_error_message = None imgui.close_current_popup() else: self._patch_error_message = msg except Exception as e: self._patch_error_message = str(e) def request_patch_from_tier4(self, error: str, file_context: str) -> None: try: from src import ai_client from src.diff_viewer import parse_diff patch_text = ai_client.run_tier4_patch_generation(error, file_context) if patch_text and "---" in patch_text and "+++" in patch_text: diff_files = parse_diff(patch_text) file_paths = [df.old_path for df in diff_files] self._pending_patch_text = patch_text self._pending_patch_files = file_paths self._show_patch_modal = True else: self._patch_error_message = patch_text or "No patch generated" except Exception as e: self._patch_error_message = str(e) def _render_log_management(self) -> None: exp, opened = imgui.begin("Log Management", self.show_windows["Log Management"]) self.show_windows["Log Management"] = bool(opened) if not exp: imgui.end() return if self._log_registry is None: self._log_registry = log_registry.LogRegistry(str(paths.get_logs_dir() / "log_registry.toml")) else: # Refresh data occasionally or on demand? For now let's just use the cached object. # The LogRegistry object loads data into self.data upon __init__. # We might want a refresh button or to reload every few seconds. if imgui.button("Refresh Registry"): self._log_registry = log_registry.LogRegistry(str(paths.get_logs_dir() / "log_registry.toml")) imgui.same_line() registry = self._log_registry sessions = registry.data if imgui.begin_table("sessions_table", 7, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable): imgui.table_setup_column("Session ID") imgui.table_setup_column("Start Time") imgui.table_setup_column("Star") imgui.table_setup_column("Reason") imgui.table_setup_column("Size (KB)") imgui.table_setup_column("Msgs") imgui.table_setup_column("Actions") imgui.table_headers_row() for session_id, s_data in sessions.items(): imgui.table_next_row() imgui.table_next_column() imgui.text(session_id) imgui.table_next_column() imgui.text(s_data.get("start_time", "")) imgui.table_next_column() whitelisted = s_data.get("whitelisted", False) if whitelisted: imgui.text_colored(vec4(255, 215, 0), "YES") else: imgui.text("NO") metadata = s_data.get("metadata") or {} imgui.table_next_column() imgui.text(metadata.get("reason", "")) imgui.table_next_column() imgui.text(str(metadata.get("size_kb", ""))) imgui.table_next_column() imgui.text(str(metadata.get("message_count", ""))) imgui.table_next_column() if whitelisted: if imgui.button(f"Unstar##{session_id}"): registry.update_session_metadata( session_id, message_count=int(metadata.get("message_count") or 0), errors=int(metadata.get("errors") or 0), size_kb=int(metadata.get("size_kb") or 0), whitelisted=False, reason=str(metadata.get("reason") or "") ) else: if imgui.button(f"Star##{session_id}"): registry.update_session_metadata( session_id, message_count=int(metadata.get("message_count") or 0), errors=int(metadata.get("errors") or 0), size_kb=int(metadata.get("size_kb") or 0), whitelisted=True, reason="Manually whitelisted" ) imgui.end_table() if imgui.button("Force Prune Logs"): self.controller.event_queue.put("gui_task", {"action": "click", "item": "btn_prune_logs"}) imgui.end() def _render_files_panel(self) -> None: imgui.text("Base Dir") ch, self.ui_files_base_dir = imgui.input_text("##f_base", self.ui_files_base_dir) imgui.same_line() if imgui.button("Browse##fb"): r = hide_tk_root() d = filedialog.askdirectory() r.destroy() if d: self.ui_files_base_dir = d imgui.separator() imgui.text("Paths") imgui.begin_child("f_paths", imgui.ImVec2(0, -40), True) for i, f in enumerate(self.files): if imgui.button(f"x##f{i}"): self.files.pop(i) break imgui.same_line() imgui.text(f) imgui.end_child() if imgui.button("Add File(s)"): r = hide_tk_root() paths = filedialog.askopenfilenames() r.destroy() for p in paths: if p not in self.files: self.files.append(p) imgui.same_line() if imgui.button("Add Wildcard"): r = hide_tk_root() d = filedialog.askdirectory() r.destroy() if d: self.files.append(str(Path(d) / "**" / "*")) def _render_screenshots_panel(self) -> None: imgui.text("Base Dir") ch, self.ui_shots_base_dir = imgui.input_text("##s_base", self.ui_shots_base_dir) imgui.same_line() if imgui.button("Browse##sb"): r = hide_tk_root() d = filedialog.askdirectory() r.destroy() if d: self.ui_shots_base_dir = d imgui.separator() imgui.text("Paths") imgui.begin_child("s_paths", imgui.ImVec2(0, -40), True) for i, s in enumerate(self.screenshots): if imgui.button(f"x##s{i}"): self.screenshots.pop(i) break imgui.same_line() imgui.text(s) imgui.end_child() if imgui.button("Add Screenshot(s)"): r = hide_tk_root() paths = filedialog.askopenfilenames( title="Select Screenshots", filetypes=[("Images", "*.png *.jpg *.jpeg *.gif *.bmp *.webp"), ("All", "*.*")], ) r.destroy() for p in paths: if p not in self.screenshots: self.screenshots.append(p) def _render_discussion_panel(self) -> None: # THINKING indicator is_thinking = self.ai_status in ["sending..."] if is_thinking: val = math.sin(time.time() * 10 * math.pi) alpha = 1.0 if val > 0 else 0.0 imgui.text_colored(imgui.ImVec4(1.0, 0.39, 0.39, alpha), "THINKING...") imgui.separator() # Prior session viewing mode if self.is_viewing_prior_session: imgui.push_style_color(imgui.Col_.child_bg, vec4(50, 40, 20)) imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION") imgui.same_line() if imgui.button("Exit Prior Session"): self.is_viewing_prior_session = False self.prior_session_entries.clear() imgui.separator() imgui.begin_child("prior_scroll", imgui.ImVec2(0, 0), False) clipper = imgui.ListClipper() clipper.begin(len(self.prior_session_entries)) while clipper.step(): for idx in range(clipper.display_start, clipper.display_end): entry = self.prior_session_entries[idx] imgui.push_id(f"prior_{idx}") kind = entry.get("kind", entry.get("type", "")) imgui.text_colored(C_LBL, f"#{idx+1}") imgui.same_line() ts = entry.get("ts", entry.get("timestamp", "")) if ts: imgui.text_colored(vec4(160, 160, 160), str(ts)) imgui.same_line() imgui.text_colored(C_KEY, str(kind)) payload = entry.get("payload", entry) text = payload.get("text", payload.get("message", payload.get("content", ""))) if text: preview = str(text).replace("\n", " ")[:200] if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(preview) imgui.pop_text_wrap_pos() else: imgui.text(preview) imgui.separator() imgui.pop_id() imgui.end_child() imgui.pop_style_color() return if not self.is_viewing_prior_session and imgui.collapsing_header("Discussions", imgui.TreeNodeFlags_.default_open): names = self._get_discussion_names() if imgui.begin_combo("##disc_sel", self.active_discussion): for name in names: is_selected = (name == self.active_discussion) if imgui.selectable(name, is_selected)[0]: self._switch_discussion(name) if is_selected: imgui.set_item_default_focus() imgui.end_combo() if self.active_track: imgui.same_line() changed, self._track_discussion_active = imgui.checkbox("Track Discussion", self._track_discussion_active) if changed: if self._track_discussion_active: self._flush_disc_entries_to_project() history_strings = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir) with self._disc_entries_lock: self.disc_entries = models.parse_history_entries(history_strings, self.disc_roles) self.ai_status = f"track discussion: {self.active_track.id}" else: self._flush_disc_entries_to_project() # Restore project discussion self._switch_discussion(self.active_discussion) disc_sec = self.project.get("discussion", {}) disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) git_commit = disc_data.get("git_commit", "") last_updated = disc_data.get("last_updated", "") imgui.text_colored(C_LBL, "commit:") imgui.same_line() imgui.text_colored(C_IN if git_commit else C_LBL, git_commit[:12] if git_commit else "(none)") imgui.same_line() if imgui.button("Update Commit"): git_dir = self.ui_project_git_dir if git_dir: cmt = project_manager.get_git_commit(git_dir) if cmt: disc_data["git_commit"] = cmt disc_data["last_updated"] = project_manager.now_ts() self.ai_status = f"commit: {cmt[:12]}" imgui.text_colored(C_LBL, "updated:") imgui.same_line() imgui.text_colored(C_SUB, last_updated if last_updated else "(never)") ch, self.ui_disc_new_name_input = imgui.input_text("##new_disc", self.ui_disc_new_name_input) imgui.same_line() if imgui.button("Create"): nm = self.ui_disc_new_name_input.strip() if nm: self._create_discussion(nm); self.ui_disc_new_name_input = "" imgui.same_line() if imgui.button("Rename"): nm = self.ui_disc_new_name_input.strip() if nm: self._rename_discussion(self.active_discussion, nm); self.ui_disc_new_name_input = "" imgui.same_line() if imgui.button("Delete"): self._delete_discussion(self.active_discussion) if not self.is_viewing_prior_session: imgui.separator() if imgui.button("+ Entry"): self.disc_entries.append({"role": self.disc_roles[0] if self.disc_roles else "User", "content": "", "collapsed": True, "ts": project_manager.now_ts()}) imgui.same_line() if imgui.button("-All"): for e in self.disc_entries: e["collapsed"] = True imgui.same_line() if imgui.button("+All"): for e in self.disc_entries: e["collapsed"] = False imgui.same_line() if imgui.button("Clear All"): self.disc_entries.clear() imgui.same_line() if imgui.button("Save"): self._flush_to_project() self._save_active_project() self._flush_to_config() models.save_config(self.config) self.ai_status = "discussion saved" ch, self.ui_auto_add_history = imgui.checkbox("Auto-add message & response to history", self.ui_auto_add_history) # Truncation controls imgui.text("Keep Pairs:") imgui.same_line() imgui.set_next_item_width(80) ch, self.ui_disc_truncate_pairs = imgui.input_int("##trunc_pairs", self.ui_disc_truncate_pairs, 1) if self.ui_disc_truncate_pairs < 1: self.ui_disc_truncate_pairs = 1 imgui.same_line() if imgui.button("Truncate"): with self._disc_entries_lock: self.disc_entries = truncate_entries(self.disc_entries, self.ui_disc_truncate_pairs) self.ai_status = f"history truncated to {self.ui_disc_truncate_pairs} pairs" imgui.separator() if imgui.collapsing_header("Roles"): imgui.begin_child("roles_scroll", imgui.ImVec2(0, 100), True) for i, r in enumerate(self.disc_roles): if imgui.button(f"x##r{i}"): self.disc_roles.pop(i) break imgui.same_line() imgui.text(r) imgui.end_child() ch, self.ui_disc_new_role_input = imgui.input_text("##new_role", self.ui_disc_new_role_input) imgui.same_line() if imgui.button("Add"): r = self.ui_disc_new_role_input.strip() if r and r not in self.disc_roles: self.disc_roles.append(r) self.ui_disc_new_role_input = "" imgui.separator() imgui.begin_child("disc_scroll", imgui.ImVec2(0, 0), False) clipper = imgui.ListClipper() clipper.begin(len(self.disc_entries)) while clipper.step(): for i in range(clipper.display_start, clipper.display_end): entry = self.disc_entries[i] imgui.push_id(str(i)) collapsed = entry.get("collapsed", False) read_mode = entry.get("read_mode", False) if imgui.button("+" if collapsed else "-"): entry["collapsed"] = not collapsed imgui.same_line() self._render_text_viewer(f"Entry #{i+1}", entry["content"]) imgui.same_line() imgui.set_next_item_width(120) if imgui.begin_combo("##role", entry["role"]): for r in self.disc_roles: if imgui.selectable(r, r == entry["role"])[0]: entry["role"] = r imgui.end_combo() if not collapsed: imgui.same_line() if imgui.button("[Edit]" if read_mode else "[Read]"): entry["read_mode"] = not read_mode ts_str = entry.get("ts", "") if ts_str: imgui.same_line() imgui.text_colored(vec4(120, 120, 100), str(ts_str)) if collapsed: imgui.same_line() if imgui.button("Ins"): self.disc_entries.insert(i, {"role": "User", "content": "", "collapsed": True, "ts": project_manager.now_ts()}) imgui.same_line() if imgui.button("Del"): self.disc_entries.pop(i) imgui.pop_id() break # Break from inner loop, clipper will re-step imgui.same_line() preview = entry["content"].replace("\\n", " ")[:60] if len(entry["content"]) > 60: preview += "..." imgui.text_colored(vec4(160, 160, 150), preview) if not collapsed: if read_mode: imgui.begin_child("read_content", imgui.ImVec2(0, 150), True) if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x) imgui.text(entry["content"]) if self.ui_word_wrap: imgui.pop_text_wrap_pos() imgui.end_child() else: ch, entry["content"] = imgui.input_text_multiline("##content", entry["content"], imgui.ImVec2(-1, 150)) imgui.separator() imgui.pop_id() if self._scroll_disc_to_bottom: imgui.set_scroll_here_y(1.0) self._scroll_disc_to_bottom = False imgui.end_child() def _render_provider_panel(self) -> None: imgui.text("Provider") if imgui.begin_combo("##prov", self.current_provider): for p in PROVIDERS: if imgui.selectable(p, p == self.current_provider)[0]: self.current_provider = p imgui.end_combo() imgui.separator() imgui.text("Model") imgui.same_line() if imgui.button("Fetch Models"): self._fetch_models(self.current_provider) if imgui.begin_list_box("##models", imgui.ImVec2(-1, 120)): for m in self.available_models: if imgui.selectable(m, m == self.current_model)[0]: self.current_model = m imgui.end_list_box() imgui.separator() imgui.text("Parameters") ch, self.temperature = imgui.slider_float("Temperature", self.temperature, 0.0, 2.0, "%.2f") ch, self.max_tokens = imgui.input_int("Max Tokens (Output)", self.max_tokens, 1024) ch, self.history_trunc_limit = imgui.input_int("History Truncation Limit", self.history_trunc_limit, 1024) if self.current_provider == "gemini_cli": imgui.separator() imgui.text("Gemini CLI") sid = "None" if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter: sid = ai_client._gemini_cli_adapter.session_id or "None" imgui.text(f"Session ID: {sid}") if imgui.button("Reset CLI Session"): ai_client.reset_session() imgui.text("Binary Path") ch, self.ui_gemini_cli_path = imgui.input_text("##gcli_path", self.ui_gemini_cli_path) imgui.same_line() if imgui.button("Browse##gcli"): r = hide_tk_root() p = filedialog.askopenfilename(title="Select gemini CLI binary") r.destroy() if p: self.ui_gemini_cli_path = p if ch: if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path def _render_token_budget_panel(self) -> None: imgui.text("Session Telemetry") usage = self.session_usage total = usage["input_tokens"] + usage["output_tokens"] if total == 0 and usage.get("total_tokens", 0) > 0: total = usage["total_tokens"] imgui.text_colored(C_RES, f"Tokens: {total:,} (In: {usage['input_tokens']:,} Out: {usage['output_tokens']:,})") if usage.get("last_latency", 0.0) > 0: imgui.text_colored(C_LBL, f" Last Latency: {usage['last_latency']:.2f}s") if usage["cache_read_input_tokens"]: imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}") if self._gemini_cache_text: imgui.text_colored(C_SUB, self._gemini_cache_text) imgui.separator() if self._token_stats_dirty: self._token_stats_dirty = False self._refresh_api_metrics({}, md_content=self._last_stable_md or None) stats = self._token_stats if not stats: imgui.text_disabled("Token stats unavailable") return pct = stats.get("utilization_pct", 0.0) current = stats.get("estimated_prompt_tokens", stats.get("total_tokens", 0)) limit = stats.get("max_prompt_tokens", 0) headroom = stats.get("headroom_tokens", max(0, limit - current)) if pct < 50.0: color = imgui.ImVec4(0.2, 0.8, 0.2, 1.0) elif pct < 80.0: color = imgui.ImVec4(1.0, 0.8, 0.0, 1.0) else: color = imgui.ImVec4(1.0, 0.2, 0.2, 1.0) imgui.push_style_color(imgui.Col_.plot_histogram, color) imgui.progress_bar(pct / 100.0, imgui.ImVec2(-1, 0), f"{pct:.1f}%") imgui.pop_style_color() imgui.text_disabled(f"{current:,} / {limit:,} tokens ({headroom:,} remaining)") sys_tok = stats.get("system_tokens", 0) tool_tok = stats.get("tools_tokens", 0) hist_tok = stats.get("history_tokens", 0) total_tok = sys_tok + tool_tok + hist_tok or 1 if imgui.begin_table("token_breakdown", 3, imgui.TableFlags_.borders_inner_h | imgui.TableFlags_.sizing_fixed_fit): imgui.table_setup_column("Component") imgui.table_setup_column("Tokens") imgui.table_setup_column("Pct") imgui.table_headers_row() for lbl, tok in [("System", sys_tok), ("Tools", tool_tok), ("History", hist_tok)]: imgui.table_next_row() imgui.table_set_column_index(0); imgui.text(lbl) imgui.table_set_column_index(1); imgui.text(f"{tok:,}") imgui.table_set_column_index(2); imgui.text(f"{tok / total_tok * 100:.0f}%") imgui.end_table() if stats.get("would_trim"): imgui.text_colored(imgui.ImVec4(1.0, 0.3, 0.0, 1.0), "WARNING: Next call will trim history") trimmable = stats.get("trimmable_turns", 0) if trimmable: imgui.text_disabled(f"Trimmable turns: {trimmable}") msgs = stats.get("messages") if msgs: shown = 0 for msg in msgs: if shown >= 3: break if msg.get("trimmable"): role = msg.get("role", "?") toks = msg.get("tokens", 0) imgui.text_disabled(f" [{role}] ~{toks:,} tokens") shown += 1 imgui.separator() if ai_client._provider == "gemini": if ai_client._gemini_cache is not None: age = time.time() - (ai_client._gemini_cache_created_at or time.time()) ttl = ai_client._GEMINI_CACHE_TTL imgui.text_colored(C_LBL, f"Gemini Cache: ACTIVE | Age: {age:.0f}s / {ttl}s | Renews at: {ttl * 0.9:.0f}s") else: imgui.text_disabled("Gemini Cache: INACTIVE") elif ai_client._provider == "anthropic": with ai_client._anthropic_history_lock: turns = len(ai_client._anthropic_history) cache_reads = 0 for entry in reversed(ai_client.get_comms_log()): if entry.get("kind") == "response": cache_reads = (entry.get("payload") or {}).get("usage", {}).get("cache_read_input_tokens") or 0 break imgui.text_disabled("Anthropic: 4-breakpoint ephemeral caching (auto-managed)") imgui.text_disabled(f" {turns} history turns | Cache reads last call: {cache_reads:,}") def _render_message_panel(self) -> None: # LIVE indicator is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."] if is_live: val = math.sin(time.time() * 10 * math.pi) alpha = 1.0 if val > 0 else 0.0 imgui.text_colored(imgui.ImVec4(0.39, 1.0, 0.39, alpha), "LIVE") imgui.separator() ch, self.ui_ai_input = imgui.input_text_multiline("##ai_in", self.ui_ai_input, imgui.ImVec2(-1, -40)) # Keyboard shortcuts io = imgui.get_io() ctrl_enter = io.key_ctrl and imgui.is_key_pressed(imgui.Key.enter) ctrl_l = io.key_ctrl and imgui.is_key_pressed(imgui.Key.l) if ctrl_l: self.ui_ai_input = "" imgui.separator() send_busy = False with self._send_thread_lock: if self.send_thread and self.send_thread.is_alive(): send_busy = True if (imgui.button("Gen + Send") or ctrl_enter) and not send_busy: self._handle_generate_send() imgui.same_line() if imgui.button("MD Only"): self._handle_md_only() imgui.same_line() if imgui.button("Reset"): self._handle_reset_session() imgui.same_line() if imgui.button("-> History"): if self.ui_ai_input: self.disc_entries.append({"role": "User", "content": self.ui_ai_input, "collapsed": False, "ts": project_manager.now_ts()}) def _render_response_panel(self) -> None: if self._trigger_blink: self._trigger_blink = False self._is_blinking = True self._blink_start_time = time.time() try: imgui.set_window_focus("Response") # type: ignore[call-arg] except: pass is_blinking = False if self._is_blinking: elapsed = time.time() - self._blink_start_time if elapsed > 1.5: self._is_blinking = False else: is_blinking = True val = math.sin(elapsed * 8 * math.pi) alpha = 50/255 if val > 0 else 0 imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 255, 0, alpha)) imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, alpha)) # --- Always Render Content --- imgui.begin_child("response_scroll_area", imgui.ImVec2(0, -40), True) imgui.input_text_multiline("##ai_out", self.ai_response, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only) imgui.end_child() imgui.separator() if imgui.button("-> History"): if self.ai_response: self.disc_entries.append({"role": "AI", "content": self.ai_response, "collapsed": True, "ts": project_manager.now_ts()}) if is_blinking: imgui.pop_style_color(2) def _render_comms_history_panel(self) -> None: imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}") imgui.same_line() if imgui.button("Clear##comms"): ai_client.clear_comms_log() self._comms_log.clear() self._comms_log_dirty = True imgui.same_line() if imgui.button("Load Log"): self.cb_load_prior_log() if self.is_viewing_prior_session: imgui.same_line() if imgui.button("Exit Prior Session"): self.is_viewing_prior_session = False self.prior_session_entries.clear() self._comms_log_dirty = True self.ai_status = "idle" imgui.separator() imgui.text_colored(C_OUT, "OUT") imgui.same_line() imgui.text_colored(C_REQ, "request") imgui.same_line() imgui.text_colored(C_TC, "tool_call") imgui.same_line() imgui.text(" ") imgui.same_line() imgui.text_colored(C_IN, "IN") imgui.same_line() imgui.text_colored(C_RES, "response") imgui.same_line() imgui.text_colored(C_TR, "tool_result") imgui.separator() # Use tinted background for prior session if self.is_viewing_prior_session: imgui.push_style_color(imgui.Col_.child_bg, vec4(40, 30, 20)) imgui.begin_child("comms_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar) log_to_render = self._comms_log_cache clipper = imgui.ListClipper() clipper.begin(len(log_to_render)) while clipper.step(): for i in range(clipper.display_start, clipper.display_end): entry = log_to_render[i] imgui.push_id(f"comms_entry_{i}") i_display = i + 1 ts = entry.get("ts", "00:00:00") direction = entry.get("direction", "??") kind = entry.get("kind", entry.get("type", "??")) provider = entry.get("provider", "?") model = entry.get("model", "?") tier = entry.get("source_tier", "main") payload = entry.get("payload", {}) if not payload and kind not in ("request", "response", "tool_call", "tool_result"): payload = entry # legacy # Row 1: #Idx TS DIR KIND Provider/Model [Tier] imgui.text_colored(C_LBL, f"#{i_display}") imgui.same_line() imgui.text_colored(vec4(160, 160, 160), ts) imgui.same_line() d_col = DIR_COLORS.get(direction, C_VAL) imgui.text_colored(d_col, direction) imgui.same_line() k_col = KIND_COLORS.get(kind, C_VAL) imgui.text_colored(k_col, kind) imgui.same_line() imgui.text_colored(C_LBL, f"{provider}/{model}") imgui.same_line() imgui.text_colored(C_SUB, f"[{tier}]") # Optimized content rendering using _render_heavy_text logic if kind == "request": self._render_heavy_text("message", payload.get("message", "")) elif kind == "response": r = payload.get("round", 0) sr = payload.get("stop_reason", "STOP") imgui.text_colored(C_LBL, f"round: {r} stop_reason: {sr}") self._render_heavy_text("text", payload.get("text", "")) tcs = payload.get("tool_calls", []) if tcs: self._render_heavy_text("tool_calls", json.dumps(tcs, indent=1)) elif kind == "tool_call": self._render_heavy_text(payload.get("name", "call"), payload.get("script") or json.dumps(payload.get("args", {}), indent=1)) elif kind == "tool_result": self._render_heavy_text(payload.get("name", "result"), payload.get("output", "")) else: self._render_heavy_text("data", str(payload)) imgui.separator() imgui.pop_id() if self._scroll_comms_to_bottom: imgui.set_scroll_here_y(1.0) self._scroll_comms_to_bottom = False imgui.end_child() if self.is_viewing_prior_session: imgui.pop_style_color() def _render_tool_calls_panel(self) -> None: imgui.text("Tool call history") imgui.same_line() if imgui.button("Clear##tc"): self._tool_log.clear() self._tool_log_dirty = True imgui.separator() log_to_render = self._tool_log_cache flags = imgui.TableFlags_.resizable | imgui.TableFlags_.hideable | imgui.TableFlags_.borders_inner_v | imgui.TableFlags_.row_bg | imgui.TableFlags_.scroll_y if imgui.begin_table("tool_calls_table", 4, flags, imgui.ImVec2(0, 0)): imgui.table_setup_column("#", imgui.TableColumnFlags_.width_fixed, 40) imgui.table_setup_column("Tier", imgui.TableColumnFlags_.width_fixed, 60) imgui.table_setup_column("Script", imgui.TableColumnFlags_.width_stretch) imgui.table_setup_column("Result", imgui.TableColumnFlags_.width_fixed, 100) imgui.table_headers_row() clipper = imgui.ListClipper() clipper.begin(len(log_to_render)) while clipper.step(): for i in range(clipper.display_start, clipper.display_end): entry = log_to_render[i] imgui.table_next_row() imgui.table_next_column() imgui.text_colored(C_LBL, f"#{i+1}") imgui.table_next_column() imgui.text_colored(C_SUB, f"[{entry.get('source_tier', 'main')}]") imgui.table_next_column() script = entry.get("script", "") script_preview = script.replace("\n", " ")[:150] if len(script) > 150: script_preview += "..." if imgui.selectable(f"{script_preview}##tc_{i}", False, imgui.SelectableFlags_.span_all_columns)[0]: self.text_viewer_title = f"Tool Script #{i+1}" self.text_viewer_content = script self.show_text_viewer = True imgui.table_next_column() res = entry.get("result", "") res_preview = res.replace("\n", " ")[:30] if len(res) > 30: res_preview += "..." if imgui.button(f"View##res_{i}"): self.text_viewer_title = f"Call Output #{i+1}" self.text_viewer_content = res self.show_text_viewer = True imgui.end_table() if self._scroll_tool_calls_to_bottom: imgui.set_scroll_here_y(1.0) self._scroll_tool_calls_to_bottom = False def _render_mma_dashboard(self) -> None: # Task 5.3: Dense Summary Line track_name = self.active_track.description if self.active_track else "None" total_tickets = len(self.active_tickets) done_tickets = sum(1 for t in self.active_tickets if t.get('status') == 'complete') total_cost = 0.0 for stats in self.mma_tier_usage.values(): model = stats.get('model', 'unknown') in_t = stats.get('input', 0) out_t = stats.get('output', 0) total_cost += cost_tracker.estimate_cost(model, in_t, out_t) imgui.text("Track:") imgui.same_line() imgui.text_colored(C_VAL, track_name) imgui.same_line() imgui.text(" | Tickets:") imgui.same_line() imgui.text_colored(C_VAL, f"{done_tickets}/{total_tickets}") imgui.same_line() imgui.text(" | Cost:") imgui.same_line() imgui.text_colored(imgui.ImVec4(0, 1, 0, 1), f"${total_cost:,.4f}") imgui.same_line() imgui.text(" | Status:") imgui.same_line() status_col = imgui.ImVec4(1, 1, 1, 1) if self.mma_status == "idle": status_col = imgui.ImVec4(0.7, 0.7, 0.7, 1) elif self.mma_status == "running": status_col = imgui.ImVec4(1, 1, 0, 1) elif self.mma_status == "done": status_col = imgui.ImVec4(0, 1, 0, 1) elif self.mma_status == "error": status_col = imgui.ImVec4(1, 0, 0, 1) imgui.text_colored(status_col, self.mma_status.upper()) imgui.separator() imgui.text_colored(C_LBL, 'Epic Planning (Tier 1)') _, self.ui_epic_input = imgui.input_text_multiline('##epic_input', self.ui_epic_input, imgui.ImVec2(-1, 80)) if imgui.button('Plan Epic (Tier 1)', imgui.ImVec2(-1, 0)): self._cb_plan_epic() imgui.separator() # 0. Conductor Setup if imgui.collapsing_header("Conductor Setup"): if imgui.button("Run Setup Scan"): self._cb_run_conductor_setup() if self.ui_conductor_setup_summary: imgui.input_text_multiline("##setup_summary", self.ui_conductor_setup_summary, imgui.ImVec2(-1, 120), imgui.InputTextFlags_.read_only) imgui.separator() # 1. Track Browser imgui.text("Track Browser") if imgui.begin_table("mma_tracks_table", 4, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable): imgui.table_setup_column("Title") imgui.table_setup_column("Status") imgui.table_setup_column("Progress") imgui.table_setup_column("Actions") imgui.table_headers_row() for track in self.tracks: imgui.table_next_row() imgui.table_next_column() imgui.text(track.get("title", "Untitled")) imgui.table_next_column() status = track.get("status", "unknown").lower() if status == "new": imgui.text_colored(imgui.ImVec4(0.7, 0.7, 0.7, 1.0), "NEW") elif status == "active": imgui.text_colored(imgui.ImVec4(1.0, 1.0, 0.0, 1.0), "ACTIVE") elif status == "done": imgui.text_colored(imgui.ImVec4(0.0, 1.0, 0.0, 1.0), "DONE") elif status == "blocked": imgui.text_colored(imgui.ImVec4(1.0, 0.0, 0.0, 1.0), "BLOCKED") else: imgui.text(status) imgui.table_next_column() progress = track.get("progress", 0.0) if progress < 0.33: p_color = imgui.ImVec4(1.0, 0.0, 0.0, 1.0) elif progress < 0.66: p_color = imgui.ImVec4(1.0, 1.0, 0.0, 1.0) else: p_color = imgui.ImVec4(0.0, 1.0, 0.0, 1.0) imgui.push_style_color(imgui.Col_.plot_histogram, p_color) imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{int(progress*100)}%") imgui.pop_style_color() imgui.table_next_column() if imgui.button(f"Load##{track.get('id')}"): self._cb_load_track(str(track.get("id") or "")) imgui.end_table() # 1b. New Track Form imgui.text("Create New Track") changed_n, self.ui_new_track_name = imgui.input_text("Name##new_track", self.ui_new_track_name) changed_d, self.ui_new_track_desc = imgui.input_text_multiline("Description##new_track", self.ui_new_track_desc, imgui.ImVec2(-1, 60)) imgui.text("Type:") imgui.same_line() if imgui.begin_combo("##track_type", self.ui_new_track_type): for ttype in ["feature", "chore", "fix"]: if imgui.selectable(ttype, self.ui_new_track_type == ttype)[0]: self.ui_new_track_type = ttype imgui.end_combo() if imgui.button("Create Track"): self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type) self.ui_new_track_name = "" self.ui_new_track_desc = "" imgui.separator() # 2. Global Controls changed, self.mma_step_mode = imgui.checkbox("Step Mode (HITL)", self.mma_step_mode) if changed: # We could push an event here if the engine needs to know immediately pass imgui.same_line() imgui.text(f"Status: {self.mma_status.upper()}") if self.active_tier: imgui.same_line() imgui.text_colored(C_VAL, f"| Active: {self.active_tier}") # Approval pending indicator any_pending = ( self._pending_mma_spawn is not None or self._pending_mma_approval is not None or self._pending_ask_dialog ) if any_pending: alpha = abs(math.sin(time.time() * 5)) imgui.same_line() imgui.text_colored(imgui.ImVec4(1.0, 0.3, 0.3, alpha), " APPROVAL PENDING") imgui.same_line() if imgui.button("Go to Approval"): pass # scroll/focus handled by existing dialog rendering imgui.separator() # 2. Active Track Info if self.active_track: imgui.text(f"Track: {self.active_track.description}") # Progress bar tickets = self.active_tickets total = len(tickets) if total > 0: complete = sum(1 for t in tickets if t.get('status') == 'complete') progress = complete / total imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{complete}/{total} Tickets") else: imgui.text_disabled("No active MMA track.") # 3. Token Usage Table imgui.separator() imgui.text("Tier Usage (Tokens & Cost)") if imgui.begin_table("mma_usage", 5, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg): imgui.table_setup_column("Tier") imgui.table_setup_column("Model") imgui.table_setup_column("Input") imgui.table_setup_column("Output") imgui.table_setup_column("Est. Cost") imgui.table_headers_row() usage = self.mma_tier_usage total_cost = 0.0 for tier, stats in usage.items(): imgui.table_next_row() imgui.table_next_column() imgui.text(tier) imgui.table_next_column() model = stats.get('model', 'unknown') imgui.text(model) imgui.table_next_column() in_t = stats.get('input', 0) imgui.text(f"{in_t:,}") imgui.table_next_column() out_t = stats.get('output', 0) imgui.text(f"{out_t:,}") imgui.table_next_column() cost = cost_tracker.estimate_cost(model, in_t, out_t) total_cost += cost imgui.text(f"${cost:,.4f}") # Total Row imgui.table_next_row() imgui.table_set_bg_color(imgui.TableBgTarget_.row_bg0, imgui.get_color_u32(imgui.Col_.plot_lines_hovered)) imgui.table_next_column() imgui.text("TOTAL") imgui.table_next_column() imgui.text("") imgui.table_next_column() imgui.text("") imgui.table_next_column() imgui.text("") imgui.table_next_column() imgui.text(f"${total_cost:,.4f}") imgui.end_table() imgui.separator() # 3b. Tier Model Config if imgui.collapsing_header("Tier Model Config"): for tier in self.mma_tier_usage.keys(): imgui.text(f"{tier}:") imgui.same_line() current_model = self.mma_tier_usage[tier].get("model", "unknown") current_provider = self.mma_tier_usage[tier].get("provider", "gemini") imgui.push_id(f"tier_cfg_{tier}") # Provider selection imgui.push_item_width(100) if imgui.begin_combo("##prov", current_provider): for p in PROVIDERS: if imgui.selectable(p, p == current_provider)[0]: self.mma_tier_usage[tier]["provider"] = p # Reset model to default for provider models_list = self.controller.all_available_models.get(p, []) if models_list: self.mma_tier_usage[tier]["model"] = models_list[0] imgui.end_combo() imgui.pop_item_width() imgui.same_line() # Model selection imgui.push_item_width(-1) models_list = self.controller.all_available_models.get(current_provider, []) if imgui.begin_combo("##model", current_model): for model in models_list: if imgui.selectable(model, current_model == model)[0]: self.mma_tier_usage[tier]["model"] = model imgui.end_combo() imgui.pop_item_width() imgui.pop_id() imgui.separator() # 4. Task DAG Visualizer imgui.text("Task DAG") if self.active_track and self.node_editor_ctx: ed.set_current_editor(self.node_editor_ctx) ed.begin('Visual DAG') # Selection detection selected = ed.get_selected_nodes() if selected: for node_id in selected: node_val = node_id.id() for t in self.active_tickets: if abs(hash(str(t.get('id', '')))) == node_val: self.ui_selected_ticket_id = str(t.get('id', '')) break break for t in self.active_tickets: tid = str(t.get('id', '??')) int_id = abs(hash(tid)) ed.begin_node(ed.NodeId(int_id)) imgui.text_colored(C_KEY, f"Ticket: {tid}") status = t.get('status', 'todo') s_col = C_VAL if status == 'done' or status == 'complete': s_col = C_IN elif status == 'in_progress' or status == 'running': s_col = C_OUT elif status == 'error': s_col = imgui.ImVec4(1, 0.4, 0.4, 1) imgui.text("Status: ") imgui.same_line() imgui.text_colored(s_col, status) imgui.text(f"Target: {t.get('target_file','')}") ed.begin_pin(ed.PinId(abs(hash(tid + "_in"))), ed.PinKind.input) imgui.text("->") ed.end_pin() imgui.same_line() ed.begin_pin(ed.PinId(abs(hash(tid + "_out"))), ed.PinKind.output) imgui.text("->") ed.end_pin() ed.end_node() for t in self.active_tickets: tid = str(t.get('id', '??')) for dep in t.get('depends_on', []): ed.link(ed.LinkId(abs(hash(dep + "_" + tid))), ed.PinId(abs(hash(dep + "_out"))), ed.PinId(abs(hash(tid + "_in")))) # Handle link creation if ed.begin_create(): start_pin = ed.PinId() end_pin = ed.PinId() if ed.query_new_link(start_pin, end_pin): if ed.accept_new_item(): s_id = start_pin.id() e_id = end_pin.id() source_tid = None target_tid = None for t in self.active_tickets: tid = str(t.get('id', '')) if abs(hash(tid + "_out")) == s_id: source_tid = tid if abs(hash(tid + "_out")) == e_id: source_tid = tid if abs(hash(tid + "_in")) == s_id: target_tid = tid if abs(hash(tid + "_in")) == e_id: target_tid = tid if source_tid and target_tid and source_tid != target_tid: for t in self.active_tickets: if str(t.get('id', '')) == target_tid: if source_tid not in t.get('depends_on', []): t.setdefault('depends_on', []).append(source_tid) self._push_mma_state_update() break ed.end_create() # Handle link deletion if ed.begin_delete(): link_id = ed.LinkId() while ed.query_deleted_link(link_id): if ed.accept_deleted_item(): lid_val = link_id.id() for t in self.active_tickets: tid = str(t.get('id', '')) deps = t.get('depends_on', []) if any(abs(hash(d + "_" + tid)) == lid_val for d in deps): t['depends_on'] = [dep for dep in deps if abs(hash(dep + "_" + tid)) != lid_val] self._push_mma_state_update() break ed.end_delete() # Validate DAG after any changes try: from src.dag_engine import TrackDAG ticket_dicts = [{'id': str(t.get('id', '')), 'depends_on': t.get('depends_on', [])} for t in self.active_tickets] temp_dag = TrackDAG(ticket_dicts) if temp_dag.has_cycle(): imgui.open_popup("Cycle Detected!") except Exception: pass ed.end() # 5. Add Ticket Form imgui.separator() if imgui.button("Add Ticket"): self._show_add_ticket_form = not self._show_add_ticket_form if self._show_add_ticket_form: # Default Ticket ID max_id = 0 for t in self.active_tickets: tid = t.get('id', '') if tid.startswith('T-'): try: max_id = max(max_id, int(tid[2:])) except: pass self.ui_new_ticket_id = f"T-{max_id + 1:03d}" self.ui_new_ticket_desc = "" self.ui_new_ticket_target = "" self.ui_new_ticket_deps = "" if self._show_add_ticket_form: imgui.begin_child("add_ticket_form", imgui.ImVec2(-1, 220), True) imgui.text_colored(C_VAL, "New Ticket Details") _, self.ui_new_ticket_id = imgui.input_text("ID##new_ticket", self.ui_new_ticket_id) _, self.ui_new_ticket_desc = imgui.input_text_multiline("Description##new_ticket", self.ui_new_ticket_desc, imgui.ImVec2(-1, 60)) _, self.ui_new_ticket_target = imgui.input_text("Target File##new_ticket", self.ui_new_ticket_target) _, self.ui_new_ticket_deps = imgui.input_text("Depends On (IDs, comma-separated)##new_ticket", self.ui_new_ticket_deps) if imgui.button("Create"): new_ticket = { "id": self.ui_new_ticket_id, "description": self.ui_new_ticket_desc, "status": "todo", "assigned_to": "tier3-worker", "target_file": self.ui_new_ticket_target, "depends_on": [d.strip() for d in self.ui_new_ticket_deps.split(",") if d.strip()] } self.active_tickets.append(new_ticket) self._show_add_ticket_form = False self._push_mma_state_update() imgui.same_line() if imgui.button("Cancel"): self._show_add_ticket_form = False imgui.end_child() else: imgui.text_disabled("No active MMA track.") # 6. Edit Selected Ticket if self.ui_selected_ticket_id: imgui.separator() imgui.text_colored(C_VAL, f"Editing: {self.ui_selected_ticket_id}") ticket = next((t for t in self.active_tickets if str(t.get('id', '')) == self.ui_selected_ticket_id), None) if ticket: imgui.text(f"Status: {ticket.get('status', 'todo')}") imgui.text(f"Target: {ticket.get('target_file', '')}") deps = ticket.get('depends_on', []) imgui.text(f"Depends on: {', '.join(deps)}") if imgui.button(f"Mark Complete##{self.ui_selected_ticket_id}"): ticket['status'] = 'done' self._push_mma_state_update() imgui.same_line() if imgui.button(f"Delete##{self.ui_selected_ticket_id}"): self.active_tickets = [t for t in self.active_tickets if str(t.get('id', '')) != self.ui_selected_ticket_id] self.ui_selected_ticket_id = None self._push_mma_state_update() def _render_tier_stream_panel(self, tier_key: str, stream_key: str | None) -> None: if stream_key is not None: content = self.mma_streams.get(stream_key, "") imgui.begin_child(f"##stream_content_{tier_key}", imgui.ImVec2(-1, -1)) imgui.text_wrapped(content) try: if len(content) != self._tier_stream_last_len.get(stream_key, -1): imgui.set_scroll_here_y(1.0) self._tier_stream_last_len[stream_key] = len(content) except (TypeError, AttributeError): pass imgui.end_child() else: tier3_keys = [k for k in self.mma_streams if "Tier 3" in k] if not tier3_keys: imgui.text_disabled("No worker output yet.") else: for key in tier3_keys: ticket_id = key.split(": ", 1)[-1] if ": " in key else key imgui.text(ticket_id) imgui.begin_child(f"##tier3_{ticket_id}_scroll", imgui.ImVec2(-1, 150), True) imgui.text_wrapped(self.mma_streams[key]) try: if len(self.mma_streams[key]) != self._tier_stream_last_len.get(key, -1): imgui.set_scroll_here_y(1.0) self._tier_stream_last_len[key] = len(self.mma_streams[key]) except (TypeError, AttributeError): pass imgui.end_child() def _render_system_prompts_panel(self) -> None: imgui.text("Global System Prompt (all projects)") ch, self.ui_global_system_prompt = imgui.input_text_multiline("##gsp", self.ui_global_system_prompt, imgui.ImVec2(-1, 100)) imgui.separator() imgui.text("Project System Prompt") ch, self.ui_project_system_prompt = imgui.input_text_multiline("##psp", self.ui_project_system_prompt, imgui.ImVec2(-1, 100)) def _render_theme_panel(self) -> None: exp, opened = imgui.begin("Theme", self.show_windows["Theme"]) self.show_windows["Theme"] = bool(opened) if exp: imgui.text("Palette") cp = theme.get_current_palette() if imgui.begin_combo("##pal", cp): for p in theme.get_palette_names(): if imgui.selectable(p, p == cp)[0]: theme.apply(p) imgui.end_combo() imgui.separator() ch1, self.ui_separate_message_panel = imgui.checkbox("Separate Message Panel", self.ui_separate_message_panel) ch2, self.ui_separate_response_panel = imgui.checkbox("Separate Response Panel", self.ui_separate_response_panel) ch3, self.ui_separate_tool_calls_panel = imgui.checkbox("Separate Tool Calls Panel", self.ui_separate_tool_calls_panel) if ch1: self.show_windows["Message"] = self.ui_separate_message_panel if ch2: self.show_windows["Response"] = self.ui_separate_response_panel if ch3: self.show_windows["Tool Calls"] = self.ui_separate_tool_calls_panel imgui.separator() imgui.text("Font") imgui.push_item_width(-150) ch, path = imgui.input_text("##fontp", theme.get_current_font_path()) imgui.pop_item_width() if ch: theme._current_font_path = path imgui.same_line() if imgui.button("Browse##font"): r = hide_tk_root() p = filedialog.askopenfilename(filetypes=[("Fonts", "*.ttf *.otf"), ("All", "*.*")]) r.destroy() if p: theme._current_font_path = p imgui.text("Size (px)") imgui.same_line() imgui.push_item_width(100) ch, size = imgui.input_float("##fonts", theme.get_current_font_size(), 1.0, 1.0, "%.0f") if ch: theme._current_font_size = size imgui.pop_item_width() imgui.same_line() if imgui.button("Apply Font (Requires Restart)"): self._flush_to_config() models.save_config(self.config) self.ai_status = "Font settings saved. Restart required." imgui.separator() imgui.text("UI Scale (DPI)") ch, scale = imgui.slider_float("##scale", theme.get_current_scale(), 0.5, 3.0, "%.2f") if ch: theme.set_scale(scale) imgui.end() def _load_fonts(self) -> None: font_path, font_size = theme.get_font_loading_params() if font_path and Path(font_path).exists(): hello_imgui.load_font(font_path, font_size) def _post_init(self) -> None: theme.apply_current() def run(self) -> None: """Initializes the ImGui runner and starts the main application loop.""" if "--headless" in sys.argv: print("Headless mode active") self._fetch_models(self.current_provider) import uvicorn headless_cfg = self.config.get("headless", {}) port = headless_cfg.get("port", 8000) api = self.create_api() uvicorn.run(api, host="0.0.0.0", port=port) else: theme.load_from_config(self.config) self.runner_params = hello_imgui.RunnerParams() self.runner_params.app_window_params.window_title = "manual slop" self.runner_params.app_window_params.window_geometry.size = (1680, 1200) self.runner_params.imgui_window_params.enable_viewports = False self.runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space self.runner_params.fps_idling.enable_idling = False self.runner_params.imgui_window_params.show_menu_bar = True self.runner_params.ini_folder_type = hello_imgui.IniFolderType.current_folder self.runner_params.ini_filename = "manualslop_layout.ini" self.runner_params.callbacks.show_gui = self._gui_func self.runner_params.callbacks.show_menus = self._show_menus self.runner_params.callbacks.load_additional_fonts = self._load_fonts self.runner_params.callbacks.post_init = self._post_init self._fetch_models(self.current_provider) immapp.run(self.runner_params) # On exit self.shutdown() session_logger.close_session() def main() -> None: app = App() app.run() if __name__ == "__main__": main()