Files
manual_slop/gui_2.py
Ed_ ddb53b250f refactor(gui2): Restructure layout into discrete Hubs
Automates the refactoring of the monolithic _gui_func in gui_2.py into separate rendering methods, nested within 'Context Hub', 'AI Settings Hub', 'Discussion Hub', and 'Operations Hub', utilizing tab bars. Adds tests to ensure the new default windows correctly represent this Hub structure.
2026-02-23 22:15:13 -05:00

1698 lines
79 KiB
Python

# gui_2.py
import tomli_w
import threading
import time
import math
import json
import sys
import os
from pathlib import Path
from tkinter import filedialog, Tk
import aggregate
import ai_client
from ai_client import ProviderError
import shell_runner
import session_logger
import project_manager
import theme_2 as theme
import tomllib
import events
import numpy as np
import api_hooks
import mcp_client
from performance_monitor import PerformanceMonitor
from imgui_bundle import imgui, hello_imgui, immapp
CONFIG_PATH = Path("config.toml")
PROVIDERS = ["gemini", "anthropic"]
COMMS_CLAMP_CHARS = 300
def load_config() -> dict:
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
def save_config(config: dict):
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
def hide_tk_root() -> Tk:
root = Tk()
root.withdraw()
root.wm_attributes("-topmost", True)
return root
# Color Helpers
def vec4(r, g, b, a=1.0): return imgui.ImVec4(r/255, g/255, b/255, a)
C_OUT = vec4(100, 200, 255)
C_IN = vec4(140, 255, 160)
C_REQ = vec4(255, 220, 100)
C_RES = vec4(180, 255, 180)
C_TC = vec4(255, 180, 80)
C_TR = vec4(180, 220, 255)
C_TRS = vec4(200, 180, 255)
C_LBL = vec4(180, 180, 180)
C_VAL = vec4(220, 220, 220)
C_KEY = vec4(140, 200, 255)
C_NUM = vec4(180, 255, 180)
C_SUB = vec4(220, 200, 120)
DIR_COLORS = {"OUT": C_OUT, "IN": C_IN}
KIND_COLORS = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS}
HEAVY_KEYS = {"message", "text", "script", "output", "content"}
DISC_ROLES = ["User", "AI", "Vendor API", "System"]
AGENT_TOOL_NAMES = ["run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"]
def truncate_entries(entries: list[dict], max_pairs: int) -> list[dict]:
if max_pairs <= 0:
return []
target_count = max_pairs * 2
if len(entries) <= target_count:
return entries
return entries[-target_count:]
def _parse_history_entries(history: list[str], roles: list[str] | None = None) -> list[dict]:
known = roles if roles is not None else DISC_ROLES
entries = []
for raw in history:
entries.append(project_manager.str_to_entry(raw, known))
return entries
class ConfirmDialog:
_next_id = 0
def __init__(self, script: str, base_dir: str):
ConfirmDialog._next_id += 1
self._uid = ConfirmDialog._next_id
self._script = str(script) if script is not None else ""
self._base_dir = str(base_dir) if base_dir is not None else ""
self._event = threading.Event()
self._approved = False
def wait(self) -> tuple[bool, str]:
self._event.wait()
return self._approved, self._script
class App:
def __init__(self):
self.config = load_config()
ai_cfg = self.config.get("ai", {})
self.current_provider: str = ai_cfg.get("provider", "gemini")
self.current_model: str = ai_cfg.get("model", "gemini-2.0-flash")
self.available_models: list[str] = []
self.temperature: float = ai_cfg.get("temperature", 0.0)
self.max_tokens: int = ai_cfg.get("max_tokens", 8192)
self.history_trunc_limit: int = ai_cfg.get("history_trunc_limit", 8000)
projects_cfg = self.config.get("projects", {})
self.project_paths: list[str] = list(projects_cfg.get("paths", []))
self.active_project_path: str = projects_cfg.get("active", "")
self.project: dict = {}
self.active_discussion: str = "main"
self._load_active_project()
# Project-derived state
self.files: list[str] = list(self.project.get("files", {}).get("paths", []))
self.screenshots: list[str] = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles: list[str] = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
self.disc_entries: list[dict] = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
# UI State Variables
self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".")
proj_meta = self.project.get("project", {})
self.ui_project_git_dir = proj_meta.get("git_dir", "")
self.ui_project_main_context = proj_meta.get("main_context", "")
self.ui_project_system_prompt = proj_meta.get("system_prompt", "")
self.ui_word_wrap = proj_meta.get("word_wrap", True)
self.ui_summary_only = proj_meta.get("summary_only", False)
self.ui_auto_add_history = disc_sec.get("auto_add", False)
self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "")
self.ui_ai_input = ""
self.ui_disc_new_name_input = ""
self.ui_disc_new_role_input = ""
# Last Script popup variables
self.ui_last_script_text = ""
self.ui_last_script_output = ""
self.ai_status = "idle"
self.ai_response = ""
self.last_md = ""
self.last_md_path: Path | None = None
self.last_file_items: list = []
self.send_thread: threading.Thread | None = None
self._send_thread_lock = threading.Lock()
self.models_thread: threading.Thread | None = None
_default_windows = {
"Context Hub": True,
"AI Settings Hub": True,
"Discussion Hub": True,
"Operations Hub": True,
"Diagnostics": False,
}
saved = self.config.get("gui", {}).get("show_windows", {})
self.show_windows = {k: saved.get(k, v) for k, v in _default_windows.items()}
self.show_script_output = False
self.show_text_viewer = False
self.text_viewer_title = ""
self.text_viewer_content = ""
self._pending_dialog: ConfirmDialog | None = None
self._pending_dialog_open = False
self._pending_dialog_lock = threading.Lock()
self._tool_log: list[tuple[str, str]] = []
self._comms_log: list[dict] = []
self._pending_comms: list[dict] = []
self._pending_comms_lock = threading.Lock()
self._pending_history_adds: list[dict] = []
self._pending_history_adds_lock = threading.Lock()
# Blinking
self._trigger_blink = False
self._is_blinking = False
self._blink_start_time = 0.0
self._trigger_script_blink = False
self._is_script_blinking = False
self._script_blink_start_time = 0.0
self._scroll_disc_to_bottom = False
# GUI Task Queue (thread-safe, for event handlers and hook server)
self._pending_gui_tasks: list[dict] = []
self._pending_gui_tasks_lock = threading.Lock()
# Session usage tracking
self.session_usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0}
# Token budget / cache telemetry
self._token_budget_pct = 0.0
self._token_budget_current = 0
self._token_budget_limit = 0
self._gemini_cache_text = ""
# Discussion truncation
self.ui_disc_truncate_pairs: int = 2
# Agent tools config
agent_tools_cfg = self.project.get("agent", {}).get("tools", {})
self.ui_agent_tools: dict[str, bool] = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
# Prior session log viewing
self.is_viewing_prior_session = False
self.prior_session_entries: list[dict] = []
# API Hooks
self.test_hooks_enabled = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1")
# Performance monitoring
self.perf_monitor = PerformanceMonitor()
self.perf_history = {"frame_time": [0.0]*100, "fps": [0.0]*100, "cpu": [0.0]*100, "input_lag": [0.0]*100}
self._perf_last_update = 0.0
# Auto-save timer (every 60s)
self._autosave_interval = 60.0
self._last_autosave = time.time()
session_logger.open_session()
ai_client.set_provider(self.current_provider, self.current_model)
ai_client.confirm_and_run_callback = self._confirm_and_run
ai_client.comms_log_callback = self._on_comms_entry
ai_client.tool_log_callback = self._on_tool_log
mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics
# AI client event subscriptions
ai_client.events.on("request_start", self._on_api_event)
ai_client.events.on("response_received", self._on_api_event)
ai_client.events.on("tool_execution", self._on_api_event)
# ---------------------------------------------------------------- project loading
def _load_active_project(self):
if self.active_project_path and Path(self.active_project_path).exists():
try:
self.project = project_manager.load_project(self.active_project_path)
return
except Exception as e:
print(f"Failed to load project {self.active_project_path}: {e}")
for pp in self.project_paths:
if Path(pp).exists():
try:
self.project = project_manager.load_project(pp)
self.active_project_path = pp
return
except Exception:
continue
self.project = project_manager.migrate_from_legacy_config(self.config)
name = self.project.get("project", {}).get("name", "project")
fallback_path = f"{name}.toml"
project_manager.save_project(self.project, fallback_path)
self.active_project_path = fallback_path
if fallback_path not in self.project_paths:
self.project_paths.append(fallback_path)
def _switch_project(self, path: str):
if not Path(path).exists():
self.ai_status = f"project file not found: {path}"
return
self._flush_to_project()
self._save_active_project()
try:
self.project = project_manager.load_project(path)
self.active_project_path = path
except Exception as e:
self.ai_status = f"failed to load project: {e}"
return
self._refresh_from_project()
ai_client.reset_session()
self.ai_status = f"switched to: {Path(path).stem}"
def _refresh_from_project(self):
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
proj = self.project
self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".")
self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "")
self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "")
self.ui_project_main_context = proj.get("project", {}).get("main_context", "")
self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False)
self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True)
self.ui_summary_only = proj.get("project", {}).get("summary_only", False)
agent_tools_cfg = proj.get("agent", {}).get("tools", {})
self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
def _save_active_project(self):
if self.active_project_path:
try:
project_manager.save_project(self.project, self.active_project_path)
except Exception as e:
self.ai_status = f"save error: {e}"
# ---------------------------------------------------------------- discussion management
def _get_discussion_names(self) -> list[str]:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
return sorted(discussions.keys())
def _switch_discussion(self, name: str):
self._flush_disc_entries_to_project()
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if name not in discussions:
self.ai_status = f"discussion not found: {name}"
return
self.active_discussion = name
disc_sec["active"] = name
disc_data = discussions[name]
self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
self.ai_status = f"discussion: {name}"
def _flush_disc_entries_to_project(self):
history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries]
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion())
disc_data["history"] = history_strings
disc_data["last_updated"] = project_manager.now_ts()
def _create_discussion(self, name: str):
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
if name in discussions:
self.ai_status = f"discussion '{name}' already exists"
return
discussions[name] = project_manager.default_discussion()
self._switch_discussion(name)
def _rename_discussion(self, old_name: str, new_name: str):
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if old_name not in discussions:
return
if new_name in discussions:
self.ai_status = f"discussion '{new_name}' already exists"
return
discussions[new_name] = discussions.pop(old_name)
if self.active_discussion == old_name:
self.active_discussion = new_name
disc_sec["active"] = new_name
def _delete_discussion(self, name: str):
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if len(discussions) <= 1:
self.ai_status = "cannot delete the last discussion"
return
if name not in discussions:
return
del discussions[name]
if self.active_discussion == name:
remaining = sorted(discussions.keys())
self._switch_discussion(remaining[0])
# ---------------------------------------------------------------- logic
def _on_comms_entry(self, entry: dict):
session_logger.log_comms(entry)
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _on_tool_log(self, script: str, result: str):
session_logger.log_tool_call(script, result, None)
def _on_api_event(self, *args, **kwargs):
payload = kwargs.get("payload", {})
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
def _process_pending_gui_tasks(self):
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in tasks:
try:
action = task.get("action")
if action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}))
except Exception as e:
print(f"Error executing GUI task: {e}")
def _recalculate_session_usage(self):
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0}
for entry in ai_client.get_comms_log():
if entry.get("kind") == "response" and "usage" in entry.get("payload", {}):
u = entry["payload"]["usage"]
for k in usage.keys():
usage[k] += u.get(k, 0) or 0
self.session_usage = usage
def _refresh_api_metrics(self, payload: dict):
self._recalculate_session_usage()
try:
stats = ai_client.get_history_bleed_stats()
self._token_budget_pct = stats.get("percentage", 0.0) / 100.0
self._token_budget_current = stats.get("current", 0)
self._token_budget_limit = stats.get("limit", 0)
except Exception:
pass
cache_stats = payload.get("cache_stats")
if cache_stats:
count = cache_stats.get("cache_count", 0)
size_bytes = cache_stats.get("total_size_bytes", 0)
self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)"
def cb_load_prior_log(self):
root = hide_tk_root()
path = filedialog.askopenfilename(
title="Load Session Log",
initialdir="logs",
filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")]
)
root.destroy()
if not path:
return
entries = []
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
except Exception as e:
self.ai_status = f"log load error: {e}"
return
self.prior_session_entries = entries
self.is_viewing_prior_session = True
self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)"
def _confirm_and_run(self, script: str, base_dir: str) -> str | None:
dialog = ConfirmDialog(script, base_dir)
with self._pending_dialog_lock:
self._pending_dialog = dialog
approved, final_script = dialog.wait()
if not approved:
self._append_tool_log(final_script, "REJECTED by user")
return None
self.ai_status = "running powershell..."
output = shell_runner.run_powershell(final_script, base_dir)
self._append_tool_log(final_script, output)
self.ai_status = "powershell done, awaiting AI..."
return output
def _append_tool_log(self, script: str, result: str):
self._tool_log.append((script, result))
self.ui_last_script_text = script
self.ui_last_script_output = result
self._trigger_script_blink = True
self.show_script_output = True
def _flush_to_project(self):
proj = self.project
proj.setdefault("output", {})["output_dir"] = self.ui_output_dir
proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir
proj["files"]["paths"] = self.files
proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir
proj["screenshots"]["paths"] = self.screenshots
proj.setdefault("project", {})
proj["project"]["git_dir"] = self.ui_project_git_dir
proj["project"]["system_prompt"] = self.ui_project_system_prompt
proj["project"]["main_context"] = self.ui_project_main_context
proj["project"]["word_wrap"] = self.ui_word_wrap
proj["project"]["summary_only"] = self.ui_summary_only
proj.setdefault("agent", {}).setdefault("tools", {})
for t_name in AGENT_TOOL_NAMES:
proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True)
self._flush_disc_entries_to_project()
disc_sec = proj.setdefault("discussion", {})
disc_sec["roles"] = self.disc_roles
disc_sec["active"] = self.active_discussion
disc_sec["auto_add"] = self.ui_auto_add_history
def _flush_to_config(self):
self.config["ai"] = {
"provider": self.current_provider,
"model": self.current_model,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"history_trunc_limit": self.history_trunc_limit,
}
self.config["ai"]["system_prompt"] = self.ui_global_system_prompt
self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path}
self.config["gui"] = {"show_windows": self.show_windows}
theme.save_to_config(self.config)
def _do_generate(self) -> tuple[str, Path, list, str, str]:
"""Returns (full_md, output_path, file_items, stable_md, discussion_text)."""
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
flat = project_manager.flat_config(self.project, self.active_discussion)
full_md, path, file_items = aggregate.run(flat)
# Build stable markdown (no history) for Gemini caching
screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", "."))
screenshots = flat.get("screenshots", {}).get("paths", [])
summary_only = flat.get("project", {}).get("summary_only", False)
stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only)
# Build discussion history text separately
history = flat.get("discussion", {}).get("history", [])
discussion_text = aggregate.build_discussion_text(history)
return full_md, path, file_items, stable_md, discussion_text
def _fetch_models(self, provider: str):
self.ai_status = "fetching models..."
def do_fetch():
try:
models = ai_client.list_models(provider)
self.available_models = models
if self.current_model not in models and models:
self.current_model = models[0]
ai_client.set_provider(self.current_provider, self.current_model)
self.ai_status = f"models loaded: {len(models)}"
except Exception as e:
self.ai_status = f"model fetch error: {e}"
self.models_thread = threading.Thread(target=do_fetch, daemon=True)
self.models_thread.start()
# ---------------------------------------------------------------- helpers
def _render_text_viewer(self, label: str, content: str):
if imgui.button("[+]##" + str(id(content))):
self.show_text_viewer = True
self.text_viewer_title = label
self.text_viewer_content = content
def _render_heavy_text(self, label: str, content: str):
imgui.text_colored(C_LBL, f"{label}:")
imgui.same_line()
self._render_text_viewer(label, content)
if len(content) > COMMS_CLAMP_CHARS:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content)
imgui.pop_text_wrap_pos()
else:
imgui.input_text_multiline(f"##{id(content)}", content, imgui.ImVec2(-1, 80), imgui.InputTextFlags_.read_only)
else:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content if content else "(empty)")
imgui.pop_text_wrap_pos()
else:
imgui.text(content if content else "(empty)")
# ---------------------------------------------------------------- gui
def _show_menus(self):
if imgui.begin_menu("Windows"):
for w in self.show_windows.keys():
_, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w])
imgui.end_menu()
if imgui.begin_menu("Project"):
if imgui.menu_item("Save All", "", False)[0]:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
if imgui.menu_item("Reset Session", "", False)[0]:
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.ai_status = "session reset"
self.ai_response = ""
if imgui.menu_item("Generate MD Only", "", False)[0]:
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
except Exception as e:
self.ai_status = f"error: {e}"
imgui.end_menu()
def _gui_func(self):
self.perf_monitor.start_frame()
# Process GUI task queue
self._process_pending_gui_tasks()
# Auto-save (every 60s)
now = time.time()
if now - self._last_autosave >= self._autosave_interval:
self._last_autosave = now
try:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
except Exception:
pass # silent — don't disrupt the GUI loop
# Sync pending comms
with self._pending_comms_lock:
for c in self._pending_comms:
self._comms_log.append(c)
self._pending_comms.clear()
with self._pending_history_adds_lock:
if self._pending_history_adds:
self._scroll_disc_to_bottom = True
for item in self._pending_history_adds:
if item["role"] not in self.disc_roles:
self.disc_roles.append(item["role"])
self.disc_entries.append(item)
self._pending_history_adds.clear()
# if imgui.begin_main_menu_bar():
# if imgui.begin_menu("Windows"):
# for w in self.show_windows.keys():
# _, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w])
# imgui.end_menu()
# if imgui.begin_menu("Project"):
# if imgui.menu_item("Save All", "", False)[0]:
# self._flush_to_project()
# self._save_active_project()
# self._flush_to_config()
# save_config(self.config)
# self.ai_status = "config saved"
# if imgui.menu_item("Reset Session", "", False)[0]:
# ai_client.reset_session()
# ai_client.clear_comms_log()
# self._tool_log.clear()
# self._comms_log.clear()
# self.ai_status = "session reset"
# self.ai_response = ""
# if imgui.menu_item("Generate MD Only", "", False)[0]:
# try:
# md, path, *_ = self._do_generate()
# self.last_md = md
# self.last_md_path = path
# self.ai_status = f"md written: {path.name}"
# except Exception as e:
# self.ai_status = f"error: {e}"
# imgui.end_menu()
# imgui.end_main_menu_bar()
# ---- Context Hub
if self.show_windows.get("Context Hub", False):
exp, self.show_windows["Context Hub"] = imgui.begin("Context Hub", self.show_windows["Context Hub"])
if exp:
if imgui.begin_tab_bar("ContextTabs"):
if imgui.begin_tab_item("Projects")[0]:
self._render_projects_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Files")[0]:
self._render_files_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Screenshots")[0]:
self._render_screenshots_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
# ---- AI Settings Hub
if self.show_windows.get("AI Settings Hub", False):
exp, self.show_windows["AI Settings Hub"] = imgui.begin("AI Settings Hub", self.show_windows["AI Settings Hub"])
if exp:
if imgui.begin_tab_bar("AISettingsTabs"):
if imgui.begin_tab_item("Provider")[0]:
self._render_provider_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("System Prompts")[0]:
self._render_system_prompts_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Theme")[0]:
self._render_theme_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
# ---- Discussion Hub
if self.show_windows.get("Discussion Hub", False):
exp, self.show_windows["Discussion Hub"] = imgui.begin("Discussion Hub", self.show_windows["Discussion Hub"])
if exp:
if imgui.begin_tab_bar("DiscussionTabs"):
if imgui.begin_tab_item("History")[0]:
self._render_discussion_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
# ---- Operations Hub
if self.show_windows.get("Operations Hub", False):
exp, self.show_windows["Operations Hub"] = imgui.begin("Operations Hub", self.show_windows["Operations Hub"])
if exp:
if imgui.begin_tab_bar("OperationsTabs"):
if imgui.begin_tab_item("Message")[0]:
self._render_message_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Response")[0]:
self._render_response_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Tool Calls")[0]:
self._render_tool_calls_panel()
imgui.end_tab_item()
if imgui.begin_tab_item("Comms History")[0]:
self._render_comms_history_panel()
imgui.end_tab_item()
imgui.end_tab_bar()
imgui.end()
# ---- Diagnostics
if self.show_windows["Diagnostics"]:
exp, self.show_windows["Diagnostics"] = imgui.begin("Diagnostics", self.show_windows["Diagnostics"])
if exp:
now = time.time()
if now - self._perf_last_update >= 0.5:
self._perf_last_update = now
metrics = self.perf_monitor.get_metrics()
self.perf_history["frame_time"].pop(0)
self.perf_history["frame_time"].append(metrics.get("last_frame_time_ms", 0.0))
self.perf_history["fps"].pop(0)
self.perf_history["fps"].append(metrics.get("fps", 0.0))
self.perf_history["cpu"].pop(0)
self.perf_history["cpu"].append(metrics.get("cpu_percent", 0.0))
self.perf_history["input_lag"].pop(0)
self.perf_history["input_lag"].append(metrics.get("input_lag_ms", 0.0))
metrics = self.perf_monitor.get_metrics()
imgui.text("Performance Telemetry")
imgui.separator()
if imgui.begin_table("perf_table", 2, imgui.TableFlags_.borders_inner_h):
imgui.table_setup_column("Metric")
imgui.table_setup_column("Value")
imgui.table_headers_row()
imgui.table_next_row()
imgui.table_next_column()
imgui.text("FPS")
imgui.table_next_column()
imgui.text(f"{metrics.get('fps', 0.0):.1f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("Frame Time (ms)")
imgui.table_next_column()
imgui.text(f"{metrics.get('last_frame_time_ms', 0.0):.2f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("CPU %")
imgui.table_next_column()
imgui.text(f"{metrics.get('cpu_percent', 0.0):.1f}")
imgui.table_next_row()
imgui.table_next_column()
imgui.text("Input Lag (ms)")
imgui.table_next_column()
imgui.text(f"{metrics.get('input_lag_ms', 0.0):.1f}")
imgui.end_table()
imgui.separator()
imgui.text("Frame Time (ms)")
imgui.plot_lines("##ft_plot", np.array(self.perf_history["frame_time"], dtype=np.float32), overlay_text="frame_time", graph_size=imgui.ImVec2(-1, 60))
imgui.text("CPU %")
imgui.plot_lines("##cpu_plot", np.array(self.perf_history["cpu"], dtype=np.float32), overlay_text="cpu", graph_size=imgui.ImVec2(-1, 60))
imgui.end()
self.perf_monitor.end_frame()
# ---- Modals / Popups
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
if not self._pending_dialog_open:
imgui.open_popup("Approve PowerShell Command")
self._pending_dialog_open = True
else:
self._pending_dialog_open = False
if imgui.begin_popup_modal("Approve PowerShell Command", None, imgui.WindowFlags_.always_auto_resize)[0]:
if dlg:
imgui.text("The AI wants to run the following PowerShell script:")
imgui.text_colored(vec4(200, 200, 100), f"base_dir: {dlg._base_dir}")
imgui.separator()
if imgui.button("[+ Maximize]##confirm"):
self.show_text_viewer = True
self.text_viewer_title = "Confirm Script"
self.text_viewer_content = dlg._script
ch, dlg._script = imgui.input_text_multiline("##confirm_script", dlg._script, imgui.ImVec2(-1, 300))
imgui.separator()
if imgui.button("Approve & Run", imgui.ImVec2(120, 0)):
dlg._approved = True
dlg._event.set()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Reject", imgui.ImVec2(120, 0)):
dlg._approved = False
dlg._event.set()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output:
if self._trigger_script_blink:
self._trigger_script_blink = False
self._is_script_blinking = True
self._script_blink_start_time = time.time()
imgui.set_window_focus_str("Last Script Output")
if self._is_script_blinking:
elapsed = time.time() - self._script_blink_start_time
if elapsed > 1.5:
self._is_script_blinking = False
else:
val = math.sin(elapsed * 8 * math.pi)
alpha = 60/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 100, 255, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 100, 255, alpha))
imgui.set_next_window_size(imgui.ImVec2(800, 600), imgui.Cond_.first_use_ever)
expanded, self.show_script_output = imgui.begin("Last Script Output", self.show_script_output)
if expanded:
imgui.text("Script:")
imgui.same_line()
self._render_text_viewer("Last Script", self.ui_last_script_text)
if self.ui_word_wrap:
imgui.begin_child("lso_s_wrap", imgui.ImVec2(-1, 200), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_text)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_s", self.ui_last_script_text, imgui.ImVec2(-1, 200), imgui.InputTextFlags_.read_only)
imgui.separator()
imgui.text("Output:")
imgui.same_line()
self._render_text_viewer("Last Output", self.ui_last_script_output)
if self.ui_word_wrap:
imgui.begin_child("lso_o_wrap", imgui.ImVec2(-1, -1), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_output)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_o", self.ui_last_script_output, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
if self._is_script_blinking:
imgui.pop_style_color(2)
imgui.end()
if self.show_text_viewer:
imgui.set_next_window_size(imgui.ImVec2(900, 700), imgui.Cond_.first_use_ever)
expanded, self.show_text_viewer = imgui.begin(f"Text Viewer - {self.text_viewer_title}", self.show_text_viewer)
if expanded:
if self.ui_word_wrap:
imgui.begin_child("tv_wrap", imgui.ImVec2(-1, -1), False)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.text_viewer_content)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##tv_c", self.text_viewer_content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end()
def _render_projects_panel(self):
proj_name = self.project.get("project", {}).get("name", Path(self.active_project_path).stem)
imgui.text_colored(C_IN, f"Active: {proj_name}")
imgui.separator()
imgui.text("Git Directory")
ch, self.ui_project_git_dir = imgui.input_text("##git_dir", self.ui_project_git_dir)
imgui.same_line()
if imgui.button("Browse##git"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Git Directory")
r.destroy()
if d: self.ui_project_git_dir = d
imgui.separator()
imgui.text("Main Context File")
ch, self.ui_project_main_context = imgui.input_text("##main_ctx", self.ui_project_main_context)
imgui.same_line()
if imgui.button("Browse##ctx"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select Main Context File")
r.destroy()
if p: self.ui_project_main_context = p
imgui.separator()
imgui.text("Output Dir")
ch, self.ui_output_dir = imgui.input_text("##out_dir", self.ui_output_dir)
imgui.same_line()
if imgui.button("Browse##out"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Output Dir")
r.destroy()
if d: self.ui_output_dir = d
imgui.separator()
imgui.text("Project Files")
imgui.begin_child("proj_files", imgui.ImVec2(0, 150), True)
for i, pp in enumerate(self.project_paths):
is_active = (pp == self.active_project_path)
if imgui.button(f"x##p{i}"):
removed = self.project_paths.pop(i)
if removed == self.active_project_path and self.project_paths:
self._switch_project(self.project_paths[0])
break
imgui.same_line()
marker = " *" if is_active else ""
if is_active: imgui.push_style_color(imgui.Col_.text, C_IN)
if imgui.button(f"{Path(pp).stem}{marker}##ps{i}"):
self._switch_project(pp)
if is_active: imgui.pop_style_color()
imgui.same_line()
imgui.text_colored(C_LBL, pp)
imgui.end_child()
if imgui.button("Add Project"):
r = hide_tk_root()
p = filedialog.askopenfilename(
title="Select Project .toml",
filetypes=[("TOML", "*.toml"), ("All", "*.*")],
)
r.destroy()
if p and p not in self.project_paths:
self.project_paths.append(p)
imgui.same_line()
if imgui.button("New Project"):
r = hide_tk_root()
p = filedialog.asksaveasfilename(title="Create New Project .toml", defaultextension=".toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")])
r.destroy()
if p:
name = Path(p).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, p)
if p not in self.project_paths:
self.project_paths.append(p)
self._switch_project(p)
imgui.same_line()
if imgui.button("Save All"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
ch, self.ui_word_wrap = imgui.checkbox("Word-Wrap (Read-only panels)", self.ui_word_wrap)
ch, self.ui_summary_only = imgui.checkbox("Summary Only (send file structure, not full content)", self.ui_summary_only)
if imgui.collapsing_header("Agent Tools"):
for t_name in AGENT_TOOL_NAMES:
val = self.ui_agent_tools.get(t_name, True)
ch, val = imgui.checkbox(f"Enable {t_name}", val)
if ch:
self.ui_agent_tools[t_name] = val
def _render_files_panel(self):
imgui.text("Base Dir")
ch, self.ui_files_base_dir = imgui.input_text("##f_base", self.ui_files_base_dir)
imgui.same_line()
if imgui.button("Browse##fb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_files_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("f_paths", imgui.ImVec2(0, -40), True)
for i, f in enumerate(self.files):
if imgui.button(f"x##f{i}"):
self.files.pop(i)
break
imgui.same_line()
imgui.text(f)
imgui.end_child()
if imgui.button("Add File(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames()
r.destroy()
for p in paths:
if p not in self.files: self.files.append(p)
imgui.same_line()
if imgui.button("Add Wildcard"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.files.append(str(Path(d) / "**" / "*"))
def _render_screenshots_panel(self):
imgui.text("Base Dir")
ch, self.ui_shots_base_dir = imgui.input_text("##s_base", self.ui_shots_base_dir)
imgui.same_line()
if imgui.button("Browse##sb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_shots_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("s_paths", imgui.ImVec2(0, -40), True)
for i, s in enumerate(self.screenshots):
if imgui.button(f"x##s{i}"):
self.screenshots.pop(i)
break
imgui.same_line()
imgui.text(s)
imgui.end_child()
if imgui.button("Add Screenshot(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames(
title="Select Screenshots",
filetypes=[("Images", "*.png *.jpg *.jpeg *.gif *.bmp *.webp"), ("All", "*.*")],
)
r.destroy()
for p in paths:
if p not in self.screenshots: self.screenshots.append(p)
def _render_discussion_panel(self):
# THINKING indicator
is_thinking = self.ai_status in ["sending..."]
if is_thinking:
val = math.sin(time.time() * 10 * math.pi)
alpha = 1.0 if val > 0 else 0.0
imgui.text_colored(imgui.ImVec4(1.0, 0.39, 0.39, alpha), "THINKING...")
imgui.separator()
# Prior session viewing mode
if self.is_viewing_prior_session:
imgui.push_style_color(imgui.Col_.child_bg, vec4(50, 40, 20))
imgui.text_colored(vec4(255, 200, 100), "VIEWING PRIOR SESSION")
imgui.same_line()
if imgui.button("Exit Prior Session"):
self.is_viewing_prior_session = False
self.prior_session_entries.clear()
imgui.separator()
imgui.begin_child("prior_scroll", imgui.ImVec2(0, 0), False)
for idx, entry in enumerate(self.prior_session_entries):
imgui.push_id(f"prior_{idx}")
kind = entry.get("kind", entry.get("type", ""))
imgui.text_colored(C_LBL, f"#{idx+1}")
imgui.same_line()
ts = entry.get("ts", entry.get("timestamp", ""))
if ts:
imgui.text_colored(vec4(160, 160, 160), str(ts))
imgui.same_line()
imgui.text_colored(C_KEY, str(kind))
payload = entry.get("payload", entry)
text = payload.get("text", payload.get("message", payload.get("content", "")))
if text:
preview = str(text).replace("\n", " ")[:200]
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(preview)
imgui.pop_text_wrap_pos()
else:
imgui.text(preview)
imgui.separator()
imgui.pop_id()
imgui.end_child()
imgui.pop_style_color()
if not self.is_viewing_prior_session and imgui.collapsing_header("Discussions", imgui.TreeNodeFlags_.default_open):
names = self._get_discussion_names()
if imgui.begin_combo("##disc_sel", self.active_discussion):
for name in names:
is_selected = (name == self.active_discussion)
if imgui.selectable(name, is_selected)[0]:
self._switch_discussion(name)
if is_selected:
imgui.set_item_default_focus()
imgui.end_combo()
disc_sec = self.project.get("discussion", {})
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
git_commit = disc_data.get("git_commit", "")
last_updated = disc_data.get("last_updated", "")
imgui.text_colored(C_LBL, "commit:")
imgui.same_line()
imgui.text_colored(C_IN if git_commit else C_LBL, git_commit[:12] if git_commit else "(none)")
imgui.same_line()
if imgui.button("Update Commit"):
git_dir = self.ui_project_git_dir
if git_dir:
cmt = project_manager.get_git_commit(git_dir)
if cmt:
disc_data["git_commit"] = cmt
disc_data["last_updated"] = project_manager.now_ts()
self.ai_status = f"commit: {cmt[:12]}"
imgui.text_colored(C_LBL, "updated:")
imgui.same_line()
imgui.text_colored(C_SUB, last_updated if last_updated else "(never)")
ch, self.ui_disc_new_name_input = imgui.input_text("##new_disc", self.ui_disc_new_name_input)
imgui.same_line()
if imgui.button("Create"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._create_discussion(nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Rename"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._rename_discussion(self.active_discussion, nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Delete"):
self._delete_discussion(self.active_discussion)
if not self.is_viewing_prior_session:
imgui.separator()
if imgui.button("+ Entry"):
self.disc_entries.append({"role": self.disc_roles[0] if self.disc_roles else "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
if imgui.button("-All"):
for e in self.disc_entries: e["collapsed"] = True
imgui.same_line()
if imgui.button("+All"):
for e in self.disc_entries: e["collapsed"] = False
imgui.same_line()
if imgui.button("Clear All"):
self.disc_entries.clear()
imgui.same_line()
if imgui.button("Save"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "discussion saved"
imgui.same_line()
if imgui.button("Load Log"):
self.cb_load_prior_log()
ch, self.ui_auto_add_history = imgui.checkbox("Auto-add message & response to history", self.ui_auto_add_history)
# Truncation controls
imgui.text("Keep Pairs:")
imgui.same_line()
imgui.set_next_item_width(80)
ch, self.ui_disc_truncate_pairs = imgui.input_int("##trunc_pairs", self.ui_disc_truncate_pairs, 1)
if self.ui_disc_truncate_pairs < 1: self.ui_disc_truncate_pairs = 1
imgui.same_line()
if imgui.button("Truncate"):
self.disc_entries = truncate_entries(self.disc_entries, self.ui_disc_truncate_pairs)
self.ai_status = f"history truncated to {self.ui_disc_truncate_pairs} pairs"
imgui.separator()
if imgui.collapsing_header("Roles"):
imgui.begin_child("roles_scroll", imgui.ImVec2(0, 100), True)
for i, r in enumerate(self.disc_roles):
if imgui.button(f"x##r{i}"):
self.disc_roles.pop(i)
break
imgui.same_line()
imgui.text(r)
imgui.end_child()
ch, self.ui_disc_new_role_input = imgui.input_text("##new_role", self.ui_disc_new_role_input)
imgui.same_line()
if imgui.button("Add"):
r = self.ui_disc_new_role_input.strip()
if r and r not in self.disc_roles:
self.disc_roles.append(r)
self.ui_disc_new_role_input = ""
imgui.separator()
imgui.begin_child("disc_scroll", imgui.ImVec2(0, 0), False)
for i, entry in enumerate(self.disc_entries):
imgui.push_id(str(i))
collapsed = entry.get("collapsed", False)
read_mode = entry.get("read_mode", False)
if imgui.button("+" if collapsed else "-"):
entry["collapsed"] = not collapsed
imgui.same_line()
imgui.set_next_item_width(120)
if imgui.begin_combo("##role", entry["role"]):
for r in self.disc_roles:
if imgui.selectable(r, r == entry["role"])[0]:
entry["role"] = r
imgui.end_combo()
if not collapsed:
imgui.same_line()
if imgui.button("[Edit]" if read_mode else "[Read]"):
entry["read_mode"] = not read_mode
ts_str = entry.get("ts", "")
if ts_str:
imgui.same_line()
imgui.text_colored(vec4(120, 120, 100), ts_str)
if collapsed:
imgui.same_line()
if imgui.button("Ins"):
self.disc_entries.insert(i, {"role": "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
self._render_text_viewer(f"Entry #{i+1}", entry["content"])
imgui.same_line()
if imgui.button("Del"):
self.disc_entries.pop(i)
imgui.pop_id()
break
imgui.same_line()
preview = entry["content"].replace("\n", " ")[:60]
if len(entry["content"]) > 60: preview += "..."
imgui.text_colored(vec4(160, 160, 150), preview)
if not collapsed:
if read_mode:
imgui.begin_child("read_content", imgui.ImVec2(0, 150), True)
if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(entry["content"])
if self.ui_word_wrap: imgui.pop_text_wrap_pos()
imgui.end_child()
else:
ch, entry["content"] = imgui.input_text_multiline("##content", entry["content"], imgui.ImVec2(-1, 150))
imgui.separator()
imgui.pop_id()
if self._scroll_disc_to_bottom:
imgui.set_scroll_here_y(1.0)
self._scroll_disc_to_bottom = False
imgui.end_child()
def _render_provider_panel(self):
imgui.text("Provider")
if imgui.begin_combo("##prov", self.current_provider):
for p in PROVIDERS:
if imgui.selectable(p, p == self.current_provider)[0]:
self.current_provider = p
ai_client.reset_session()
ai_client.set_provider(p, self.current_model)
self.available_models = []
self._fetch_models(p)
imgui.end_combo()
imgui.separator()
imgui.text("Model")
imgui.same_line()
if imgui.button("Fetch Models"):
self._fetch_models(self.current_provider)
if imgui.begin_list_box("##models", imgui.ImVec2(-1, 120)):
for m in self.available_models:
if imgui.selectable(m, m == self.current_model)[0]:
self.current_model = m
ai_client.reset_session()
ai_client.set_provider(self.current_provider, m)
imgui.end_list_box()
imgui.separator()
imgui.text("Parameters")
ch, self.temperature = imgui.slider_float("Temperature", self.temperature, 0.0, 2.0, "%.2f")
ch, self.max_tokens = imgui.input_int("Max Tokens (Output)", self.max_tokens, 1024)
ch, self.history_trunc_limit = imgui.input_int("History Truncation Limit", self.history_trunc_limit, 1024)
imgui.separator()
imgui.text("Telemetry")
usage = self.session_usage
total = usage["input_tokens"] + usage["output_tokens"]
imgui.text_colored(C_RES, f"Tokens: {total:,} (In: {usage['input_tokens']:,} Out: {usage['output_tokens']:,})")
if usage["cache_read_input_tokens"]:
imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}")
imgui.text("Token Budget:")
imgui.progress_bar(self._token_budget_pct, imgui.ImVec2(-1, 0), f"{self._token_budget_current:,} / {self._token_budget_limit:,}")
if self._gemini_cache_text:
imgui.text_colored(C_SUB, self._gemini_cache_text)
def _render_message_panel(self):
# LIVE indicator
is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
if is_live:
val = math.sin(time.time() * 10 * math.pi)
alpha = 1.0 if val > 0 else 0.0
imgui.text_colored(imgui.ImVec4(0.39, 1.0, 0.39, alpha), "LIVE")
imgui.separator()
ch, self.ui_ai_input = imgui.input_text_multiline("##ai_in", self.ui_ai_input, imgui.ImVec2(-1, -40))
# Keyboard shortcuts
io = imgui.get_io()
ctrl_enter = io.key_ctrl and imgui.is_key_pressed(imgui.Key.enter)
ctrl_l = io.key_ctrl and imgui.is_key_pressed(imgui.Key.l)
if ctrl_l:
self.ui_ai_input = ""
imgui.separator()
send_busy = False
with self._send_thread_lock:
if self.send_thread and self.send_thread.is_alive():
send_busy = True
if imgui.button("Gen + Send") or ctrl_enter:
if not send_busy:
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
self.ai_status = f"generate error: {e}"
else:
self.ai_status = "sending..."
user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools)
send_md = stable_md
send_disc = disc_text
def do_send():
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "User", "content": user_msg, "collapsed": False, "ts": project_manager.now_ts()})
try:
resp = ai_client.send(send_md, user_msg, base_dir, self.last_file_items, send_disc)
self.ai_response = resp
self.ai_status = "done"
self._trigger_blink = True
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "AI", "content": resp, "collapsed": False, "ts": project_manager.now_ts()})
except ProviderError as e:
self.ai_response = e.ui_message()
self.ai_status = "error"
self._trigger_blink = True
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "Vendor API", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
except Exception as e:
self.ai_response = f"ERROR: {e}"
self.ai_status = "error"
self._trigger_blink = True
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "System", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
with self._send_thread_lock:
self.send_thread = threading.Thread(target=do_send, daemon=True)
self.send_thread.start()
imgui.same_line()
if imgui.button("MD Only"):
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
except Exception as e:
self.ai_status = f"error: {e}"
imgui.same_line()
if imgui.button("Reset"):
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.ai_status = "session reset"
self.ai_response = ""
imgui.same_line()
if imgui.button("-> History"):
if self.ui_ai_input:
self.disc_entries.append({"role": "User", "content": self.ui_ai_input, "collapsed": False, "ts": project_manager.now_ts()})
def _render_response_panel(self):
if self._trigger_blink:
self._trigger_blink = False
self._is_blinking = True
self._blink_start_time = time.time()
imgui.set_window_focus_str("Response")
if self._is_blinking:
elapsed = time.time() - self._blink_start_time
if elapsed > 1.5:
self._is_blinking = False
else:
val = math.sin(elapsed * 8 * math.pi)
alpha = 50/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 255, 0, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, alpha))
if self.ui_word_wrap:
imgui.begin_child("resp_wrap", imgui.ImVec2(-1, -40), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ai_response)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##ai_out", self.ai_response, imgui.ImVec2(-1, -40), imgui.InputTextFlags_.read_only)
imgui.separator()
if imgui.button("-> History"):
if self.ai_response:
self.disc_entries.append({"role": "AI", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
if self._is_blinking:
imgui.pop_style_color(2)
def _render_tool_calls_panel(self):
imgui.text("Tool call history")
imgui.same_line()
if imgui.button("Clear##tc"):
self._tool_log.clear()
imgui.separator()
imgui.begin_child("tc_scroll")
for i, (script, result) in enumerate(self._tool_log, 1):
first_line = script.strip().splitlines()[0][:80] if script.strip() else "(empty)"
imgui.text_colored(C_KEY, f"Call #{i}: {first_line}")
imgui.same_line()
self._render_text_viewer(f"Call Script #{i}", script)
imgui.same_line()
self._render_text_viewer(f"Call Output #{i}", result)
if self.ui_word_wrap:
imgui.begin_child(f"tc_wrap_{i}", imgui.ImVec2(-1, 72), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(result)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline(f"##tc_res_{i}", result, imgui.ImVec2(-1, 72), imgui.InputTextFlags_.read_only)
imgui.separator()
imgui.end_child()
def _render_comms_history_panel(self):
imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}")
imgui.same_line()
if imgui.button("Clear##comms"):
ai_client.clear_comms_log()
self._comms_log.clear()
imgui.separator()
imgui.text_colored(C_OUT, "OUT")
imgui.same_line()
imgui.text_colored(C_REQ, "request")
imgui.same_line()
imgui.text_colored(C_TC, "tool_call")
imgui.same_line()
imgui.text(" ")
imgui.same_line()
imgui.text_colored(C_IN, "IN")
imgui.same_line()
imgui.text_colored(C_RES, "response")
imgui.same_line()
imgui.text_colored(C_TR, "tool_result")
imgui.separator()
imgui.begin_child("comms_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar)
for idx, entry in enumerate(self._comms_log, 1):
imgui.push_id(f"comms_{idx}")
d = entry["direction"]
k = entry["kind"]
imgui.text_colored(vec4(160, 160, 160), f"#{idx}")
imgui.same_line()
imgui.text_colored(vec4(160, 160, 160), entry["ts"])
imgui.same_line()
imgui.text_colored(DIR_COLORS.get(d, C_VAL), d)
imgui.same_line()
imgui.text_colored(KIND_COLORS.get(k, C_VAL), k)
imgui.same_line()
imgui.text_colored(C_LBL, f"{entry['provider']}/{entry['model']}")
payload = entry["payload"]
if k == "request":
self._render_heavy_text("message", payload.get("message", ""))
elif k == "response":
imgui.text_colored(C_LBL, "round:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload.get("round", "")))
imgui.text_colored(C_LBL, "stop_reason:")
imgui.same_line()
imgui.text_colored(vec4(255, 200, 120), str(payload.get("stop_reason", "")))
text = payload.get("text", "")
if text:
self._render_heavy_text("text", text)
imgui.text_colored(C_LBL, "tool_calls:")
tcs = payload.get("tool_calls", [])
if not tcs:
imgui.text_colored(C_VAL, " (none)")
for i, tc in enumerate(tcs):
imgui.text_colored(C_KEY, f" call[{i}] {tc.get('name', '?')}")
if "id" in tc:
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(tc["id"]))
args = tc.get("args") or tc.get("input") or {}
if isinstance(args, dict):
for ak, av in args.items():
self._render_heavy_text(f" {ak}", str(av))
elif args:
self._render_heavy_text(" args", str(args))
usage = payload.get("usage")
if usage:
imgui.text_colored(C_SUB, "usage:")
for uk, uv in usage.items():
imgui.text_colored(C_LBL, f" {uk.replace('_', ' ')}:")
imgui.same_line()
imgui.text_colored(C_NUM, str(uv))
elif k == "tool_call":
imgui.text_colored(C_LBL, "name:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload.get("name", "")))
if "id" in payload:
imgui.text_colored(C_LBL, "id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
if "script" in payload:
self._render_heavy_text("script", payload.get("script", ""))
elif "args" in payload:
args = payload["args"]
if isinstance(args, dict):
for ak, av in args.items():
self._render_heavy_text(ak, str(av))
else:
self._render_heavy_text("args", str(args))
elif k == "tool_result":
imgui.text_colored(C_LBL, "name:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload.get("name", "")))
if "id" in payload:
imgui.text_colored(C_LBL, "id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
self._render_heavy_text("output", payload.get("output", ""))
elif k == "tool_result_send":
for i, r in enumerate(payload.get("results", [])):
imgui.text_colored(C_KEY, f"result[{i}]")
imgui.text_colored(C_LBL, " tool_use_id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(r.get("tool_use_id", "")))
self._render_heavy_text(" content", str(r.get("content", "")))
else:
for key, val in payload.items():
vstr = json.dumps(val, ensure_ascii=False, indent=2) if isinstance(val, (dict, list)) else str(val)
if key in HEAVY_KEYS:
self._render_heavy_text(key, vstr)
else:
imgui.text_colored(C_LBL, f"{key}:")
imgui.same_line()
imgui.text_colored(C_VAL, vstr)
imgui.separator()
imgui.pop_id()
imgui.end_child()
def _render_system_prompts_panel(self):
imgui.text("Global System Prompt (all projects)")
ch, self.ui_global_system_prompt = imgui.input_text_multiline("##gsp", self.ui_global_system_prompt, imgui.ImVec2(-1, 100))
imgui.separator()
imgui.text("Project System Prompt")
ch, self.ui_project_system_prompt = imgui.input_text_multiline("##psp", self.ui_project_system_prompt, imgui.ImVec2(-1, 100))
def _render_theme_panel(self):
imgui.text("Palette")
cp = theme.get_current_palette()
if imgui.begin_combo("##pal", cp):
for p in theme.get_palette_names():
if imgui.selectable(p, p == cp)[0]:
theme.apply(p)
imgui.end_combo()
imgui.separator()
imgui.text("Font")
imgui.push_item_width(-150)
ch, path = imgui.input_text("##fontp", theme.get_current_font_path())
imgui.pop_item_width()
if ch: theme._current_font_path = path
imgui.same_line()
if imgui.button("Browse##font"):
r = hide_tk_root()
p = filedialog.askopenfilename(filetypes=[("Fonts", "*.ttf *.otf"), ("All", "*.*")])
r.destroy()
if p: theme._current_font_path = p
imgui.text("Size (px)")
imgui.same_line()
imgui.push_item_width(100)
ch, size = imgui.input_float("##fonts", theme.get_current_font_size(), 1.0, 1.0, "%.0f")
if ch: theme._current_font_size = size
imgui.pop_item_width()
imgui.same_line()
if imgui.button("Apply Font (Requires Restart)"):
self._flush_to_config()
save_config(self.config)
self.ai_status = "Font settings saved. Restart required."
imgui.separator()
imgui.text("UI Scale (DPI)")
ch, scale = imgui.slider_float("##scale", theme.get_current_scale(), 0.5, 3.0, "%.2f")
if ch: theme.set_scale(scale)
def _load_fonts(self):
font_path, font_size = theme.get_font_loading_params()
if font_path and Path(font_path).exists():
hello_imgui.load_font(font_path, font_size)
def _post_init(self):
theme.apply_current()
def run(self):
theme.load_from_config(self.config)
self.runner_params = hello_imgui.RunnerParams()
self.runner_params.app_window_params.window_title = "manual slop"
self.runner_params.app_window_params.window_geometry.size = (1680, 1200)
self.runner_params.imgui_window_params.enable_viewports = True
self.runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space
self.runner_params.imgui_window_params.show_menu_bar = True
self.runner_params.ini_folder_type = hello_imgui.IniFolderType.current_folder
self.runner_params.ini_filename = "manualslop_layout.ini"
self.runner_params.callbacks.show_gui = self._gui_func
self.runner_params.callbacks.show_menus = self._show_menus
self.runner_params.callbacks.load_additional_fonts = self._load_fonts
self.runner_params.callbacks.post_init = self._post_init
self._fetch_models(self.current_provider)
# Start API hooks server (if enabled)
self.hook_server = api_hooks.HookServer(self)
self.hook_server.start()
immapp.run(self.runner_params)
# On exit
self.hook_server.stop()
self.perf_monitor.stop()
ai_client.cleanup() # Destroy active API caches to stop billing
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
session_logger.close_session()
def main():
app = App()
app.run()
if __name__ == "__main__":
main()