Files
manual_slop/gui_2.py

1277 lines
59 KiB
Python

# gui_2.py
import tomli_w
import threading
import time
import math
import json
from pathlib import Path
from tkinter import filedialog, Tk
import aggregate
import ai_client
from ai_client import ProviderError
import shell_runner
import session_logger
import project_manager
import theme_2 as theme
import tomllib
from imgui_bundle import imgui, hello_imgui, immapp
CONFIG_PATH = Path("config.toml")
PROVIDERS = ["gemini", "anthropic"]
COMMS_CLAMP_CHARS = 300
def load_config() -> dict:
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
def save_config(config: dict):
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
def hide_tk_root() -> Tk:
root = Tk()
root.withdraw()
root.wm_attributes("-topmost", True)
return root
# Color Helpers
def vec4(r, g, b, a=1.0): return imgui.ImVec4(r/255, g/255, b/255, a)
C_OUT = vec4(100, 200, 255)
C_IN = vec4(140, 255, 160)
C_REQ = vec4(255, 220, 100)
C_RES = vec4(180, 255, 180)
C_TC = vec4(255, 180, 80)
C_TR = vec4(180, 220, 255)
C_TRS = vec4(200, 180, 255)
C_LBL = vec4(180, 180, 180)
C_VAL = vec4(220, 220, 220)
C_KEY = vec4(140, 200, 255)
C_NUM = vec4(180, 255, 180)
C_SUB = vec4(220, 200, 120)
DIR_COLORS = {"OUT": C_OUT, "IN": C_IN}
KIND_COLORS = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS}
HEAVY_KEYS = {"message", "text", "script", "output", "content"}
DISC_ROLES = ["User", "AI", "Vendor API", "System"]
def _parse_history_entries(history: list[str], roles: list[str] | None = None) -> list[dict]:
known = roles if roles is not None else DISC_ROLES
entries = []
for raw in history:
entries.append(project_manager.str_to_entry(raw, known))
return entries
class ConfirmDialog:
_next_id = 0
def __init__(self, script: str, base_dir: str):
ConfirmDialog._next_id += 1
self._uid = ConfirmDialog._next_id
self._script = str(script) if script is not None else ""
self._base_dir = str(base_dir) if base_dir is not None else ""
self._event = threading.Event()
self._approved = False
def wait(self) -> tuple[bool, str]:
self._event.wait()
return self._approved, self._script
class App:
def __init__(self):
self.config = load_config()
ai_cfg = self.config.get("ai", {})
self.current_provider: str = ai_cfg.get("provider", "gemini")
self.current_model: str = ai_cfg.get("model", "gemini-2.0-flash")
self.available_models: list[str] = []
projects_cfg = self.config.get("projects", {})
self.project_paths: list[str] = list(projects_cfg.get("paths", []))
self.active_project_path: str = projects_cfg.get("active", "")
self.project: dict = {}
self.active_discussion: str = "main"
self._load_active_project()
# Project-derived state
self.files: list[str] = list(self.project.get("files", {}).get("paths", []))
self.screenshots: list[str] = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles: list[str] = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
self.disc_entries: list[dict] = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
# UI State Variables
self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".")
proj_meta = self.project.get("project", {})
self.ui_project_git_dir = proj_meta.get("git_dir", "")
self.ui_project_main_context = proj_meta.get("main_context", "")
self.ui_project_system_prompt = proj_meta.get("system_prompt", "")
self.ui_word_wrap = proj_meta.get("word_wrap", True)
self.ui_auto_add_history = disc_sec.get("auto_add", False)
self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "")
self.ui_ai_input = ""
self.ui_disc_new_name_input = ""
self.ui_disc_new_role_input = ""
# Last Script popup variables
self.ui_last_script_text = ""
self.ui_last_script_output = ""
self.ai_status = "idle"
self.ai_response = ""
self.last_md = ""
self.last_md_path: Path | None = None
self.last_file_items: list = []
self.send_thread: threading.Thread | None = None
self.models_thread: threading.Thread | None = None
self.show_windows = {
"Projects": True,
"Files": True,
"Screenshots": True,
"Discussion History": True,
"Provider": True,
"Message": True,
"Response": True,
"Tool Calls": True,
"Comms History": True,
"System Prompts": True,
"Theme": True,
}
self.show_script_output = False
self.show_text_viewer = False
self.text_viewer_title = ""
self.text_viewer_content = ""
self._pending_dialog: ConfirmDialog | None = None
self._pending_dialog_open = False
self._pending_dialog_lock = threading.Lock()
self._tool_log: list[tuple[str, str]] = []
self._comms_log: list[dict] = []
self._pending_comms: list[dict] = []
self._pending_comms_lock = threading.Lock()
self._pending_history_adds: list[dict] = []
self._pending_history_adds_lock = threading.Lock()
# Blinking
self._trigger_blink = False
self._is_blinking = False
self._blink_start_time = 0.0
self._trigger_script_blink = False
self._is_script_blinking = False
self._script_blink_start_time = 0.0
session_logger.open_session()
ai_client.set_provider(self.current_provider, self.current_model)
ai_client.confirm_and_run_callback = self._confirm_and_run
ai_client.comms_log_callback = self._on_comms_entry
ai_client.tool_log_callback = self._on_tool_log
# ---------------------------------------------------------------- project loading
def _load_active_project(self):
if self.active_project_path and Path(self.active_project_path).exists():
try:
self.project = project_manager.load_project(self.active_project_path)
return
except Exception as e:
print(f"Failed to load project {self.active_project_path}: {e}")
for pp in self.project_paths:
if Path(pp).exists():
try:
self.project = project_manager.load_project(pp)
self.active_project_path = pp
return
except Exception:
continue
self.project = project_manager.migrate_from_legacy_config(self.config)
name = self.project.get("project", {}).get("name", "project")
fallback_path = f"{name}.toml"
project_manager.save_project(self.project, fallback_path)
self.active_project_path = fallback_path
if fallback_path not in self.project_paths:
self.project_paths.append(fallback_path)
def _switch_project(self, path: str):
if not Path(path).exists():
self.ai_status = f"project file not found: {path}"
return
self._flush_to_project()
self._save_active_project()
try:
self.project = project_manager.load_project(path)
self.active_project_path = path
except Exception as e:
self.ai_status = f"failed to load project: {e}"
return
self._refresh_from_project()
ai_client.reset_session()
self.ai_status = f"switched to: {Path(path).stem}"
def _refresh_from_project(self):
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
proj = self.project
self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".")
self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "")
self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "")
self.ui_project_main_context = proj.get("project", {}).get("main_context", "")
self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False)
self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True)
def _save_active_project(self):
if self.active_project_path:
try:
project_manager.save_project(self.project, self.active_project_path)
except Exception as e:
self.ai_status = f"save error: {e}"
# ---------------------------------------------------------------- discussion management
def _get_discussion_names(self) -> list[str]:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
return sorted(discussions.keys())
def _switch_discussion(self, name: str):
self._flush_disc_entries_to_project()
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if name not in discussions:
self.ai_status = f"discussion not found: {name}"
return
self.active_discussion = name
disc_sec["active"] = name
disc_data = discussions[name]
self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
self.ai_status = f"discussion: {name}"
def _flush_disc_entries_to_project(self):
history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries]
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion())
disc_data["history"] = history_strings
disc_data["last_updated"] = project_manager.now_ts()
def _create_discussion(self, name: str):
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
if name in discussions:
self.ai_status = f"discussion '{name}' already exists"
return
discussions[name] = project_manager.default_discussion()
self._switch_discussion(name)
def _rename_discussion(self, old_name: str, new_name: str):
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if old_name not in discussions:
return
if new_name in discussions:
self.ai_status = f"discussion '{new_name}' already exists"
return
discussions[new_name] = discussions.pop(old_name)
if self.active_discussion == old_name:
self.active_discussion = new_name
disc_sec["active"] = new_name
def _delete_discussion(self, name: str):
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if len(discussions) <= 1:
self.ai_status = "cannot delete the last discussion"
return
if name not in discussions:
return
del discussions[name]
if self.active_discussion == name:
remaining = sorted(discussions.keys())
self._switch_discussion(remaining[0])
# ---------------------------------------------------------------- logic
def _on_comms_entry(self, entry: dict):
session_logger.log_comms(entry)
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _on_tool_log(self, script: str, result: str):
session_logger.log_tool_call(script, result, None)
def _confirm_and_run(self, script: str, base_dir: str) -> str | None:
dialog = ConfirmDialog(script, base_dir)
with self._pending_dialog_lock:
self._pending_dialog = dialog
approved, final_script = dialog.wait()
if not approved:
self._append_tool_log(final_script, "REJECTED by user")
return None
self.ai_status = "running powershell..."
output = shell_runner.run_powershell(final_script, base_dir)
self._append_tool_log(final_script, output)
self.ai_status = "powershell done, awaiting AI..."
return output
def _append_tool_log(self, script: str, result: str):
self._tool_log.append((script, result))
self.ui_last_script_text = script
self.ui_last_script_output = result
self._trigger_script_blink = True
self.show_script_output = True
def _flush_to_project(self):
proj = self.project
proj.setdefault("output", {})["output_dir"] = self.ui_output_dir
proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir
proj["files"]["paths"] = self.files
proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir
proj["screenshots"]["paths"] = self.screenshots
proj.setdefault("project", {})
proj["project"]["git_dir"] = self.ui_project_git_dir
proj["project"]["system_prompt"] = self.ui_project_system_prompt
proj["project"]["main_context"] = self.ui_project_main_context
proj["project"]["word_wrap"] = self.ui_word_wrap
self._flush_disc_entries_to_project()
disc_sec = proj.setdefault("discussion", {})
disc_sec["roles"] = self.disc_roles
disc_sec["active"] = self.active_discussion
disc_sec["auto_add"] = self.ui_auto_add_history
def _flush_to_config(self):
self.config["ai"] = {"provider": self.current_provider, "model": self.current_model}
self.config["ai"]["system_prompt"] = self.ui_global_system_prompt
self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path}
theme.save_to_config(self.config)
def _do_generate(self) -> tuple[str, Path, list]:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
flat = project_manager.flat_config(self.project, self.active_discussion)
return aggregate.run(flat)
def _fetch_models(self, provider: str):
self.ai_status = "fetching models..."
def do_fetch():
try:
models = ai_client.list_models(provider)
self.available_models = models
if self.current_model not in models and models:
self.current_model = models[0]
ai_client.set_provider(self.current_provider, self.current_model)
self.ai_status = f"models loaded: {len(models)}"
except Exception as e:
self.ai_status = f"model fetch error: {e}"
self.models_thread = threading.Thread(target=do_fetch, daemon=True)
self.models_thread.start()
# ---------------------------------------------------------------- helpers
def _render_text_viewer(self, label: str, content: str):
if imgui.button("[+]##" + str(id(content))):
self.show_text_viewer = True
self.text_viewer_title = label
self.text_viewer_content = content
def _render_heavy_text(self, label: str, content: str):
imgui.text_colored(C_LBL, f"{label}:")
imgui.same_line()
self._render_text_viewer(label, content)
if len(content) > COMMS_CLAMP_CHARS:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content)
imgui.pop_text_wrap_pos()
else:
imgui.input_text_multiline(f"##{id(content)}", content, imgui.ImVec2(-1, 80), imgui.InputTextFlags_.read_only)
else:
if self.ui_word_wrap:
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(content if content else "(empty)")
imgui.pop_text_wrap_pos()
else:
imgui.text(content if content else "(empty)")
# ---------------------------------------------------------------- gui
def _gui_func(self):
# Sync pending comms
with self._pending_comms_lock:
for c in self._pending_comms:
self._comms_log.append(c)
self._pending_comms.clear()
with self._pending_history_adds_lock:
for item in self._pending_history_adds:
if item["role"] not in self.disc_roles:
self.disc_roles.append(item["role"])
self.disc_entries.append(item)
self._pending_history_adds.clear()
if imgui.begin_main_menu_bar():
if imgui.begin_menu("Windows"):
for w in self.show_windows.keys():
_, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w])
imgui.end_menu()
if imgui.begin_menu("Project"):
if imgui.menu_item("Save All")[0]:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
if imgui.menu_item("Reset Session")[0]:
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.ai_status = "session reset"
self.ai_response = ""
if imgui.menu_item("Generate MD Only")[0]:
try:
md, path, _ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
except Exception as e:
self.ai_status = f"error: {e}"
imgui.end_menu()
imgui.end_main_menu_bar()
# ---- Projects
if self.show_windows["Projects"]:
exp, self.show_windows["Projects"] = imgui.begin("Projects", self.show_windows["Projects"])
if exp:
proj_name = self.project.get("project", {}).get("name", Path(self.active_project_path).stem)
imgui.text_colored(C_IN, f"Active: {proj_name}")
imgui.separator()
imgui.text("Git Directory")
ch, self.ui_project_git_dir = imgui.input_text("##git_dir", self.ui_project_git_dir)
imgui.same_line()
if imgui.button("Browse##git"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Git Directory")
r.destroy()
if d: self.ui_project_git_dir = d
imgui.separator()
imgui.text("Main Context File")
ch, self.ui_project_main_context = imgui.input_text("##main_ctx", self.ui_project_main_context)
imgui.same_line()
if imgui.button("Browse##ctx"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select Main Context File")
r.destroy()
if p: self.ui_project_main_context = p
imgui.separator()
imgui.text("Output Dir")
ch, self.ui_output_dir = imgui.input_text("##out_dir", self.ui_output_dir)
imgui.same_line()
if imgui.button("Browse##out"):
r = hide_tk_root()
d = filedialog.askdirectory(title="Select Output Dir")
r.destroy()
if d: self.ui_output_dir = d
imgui.separator()
imgui.text("Project Files")
imgui.begin_child("proj_files", imgui.ImVec2(0, 150), True)
for i, pp in enumerate(self.project_paths):
is_active = (pp == self.active_project_path)
if imgui.button(f"x##p{i}"):
removed = self.project_paths.pop(i)
if removed == self.active_project_path and self.project_paths:
self._switch_project(self.project_paths[0])
break
imgui.same_line()
marker = " *" if is_active else ""
if is_active: imgui.push_style_color(imgui.Col_.text, C_IN)
if imgui.button(f"{Path(pp).stem}{marker}##ps{i}"):
self._switch_project(pp)
if is_active: imgui.pop_style_color()
imgui.same_line()
imgui.text_colored(C_LBL, pp)
imgui.end_child()
if imgui.button("Add Project"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select Project .toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")])
r.destroy()
if p and p not in self.project_paths:
self.project_paths.append(p)
imgui.same_line()
if imgui.button("New Project"):
r = hide_tk_root()
p = filedialog.asksaveasfilename(title="Create New Project .toml", defaultextension=".toml", filetypes=[("TOML", "*.toml"), ("All", "*.*")])
r.destroy()
if p:
name = Path(p).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, p)
if p not in self.project_paths:
self.project_paths.append(p)
self._switch_project(p)
imgui.same_line()
if imgui.button("Save All"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
ch, self.ui_word_wrap = imgui.checkbox("Word-Wrap (Read-only panels)", self.ui_word_wrap)
imgui.end()
# ---- Files
if self.show_windows["Files"]:
exp, self.show_windows["Files"] = imgui.begin("Files", self.show_windows["Files"])
if exp:
imgui.text("Base Dir")
ch, self.ui_files_base_dir = imgui.input_text("##f_base", self.ui_files_base_dir)
imgui.same_line()
if imgui.button("Browse##fb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_files_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("f_paths", imgui.ImVec2(0, -40), True)
for i, f in enumerate(self.files):
if imgui.button(f"x##f{i}"):
self.files.pop(i)
break
imgui.same_line()
imgui.text(f)
imgui.end_child()
if imgui.button("Add File(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames()
r.destroy()
for p in paths:
if p not in self.files: self.files.append(p)
imgui.same_line()
if imgui.button("Add Wildcard"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.files.append(str(Path(d) / "**" / "*"))
imgui.end()
# ---- Screenshots
if self.show_windows["Screenshots"]:
exp, self.show_windows["Screenshots"] = imgui.begin("Screenshots", self.show_windows["Screenshots"])
if exp:
imgui.text("Base Dir")
ch, self.ui_shots_base_dir = imgui.input_text("##s_base", self.ui_shots_base_dir)
imgui.same_line()
if imgui.button("Browse##sb"):
r = hide_tk_root()
d = filedialog.askdirectory()
r.destroy()
if d: self.ui_shots_base_dir = d
imgui.separator()
imgui.text("Paths")
imgui.begin_child("s_paths", imgui.ImVec2(0, -40), True)
for i, s in enumerate(self.screenshots):
if imgui.button(f"x##s{i}"):
self.screenshots.pop(i)
break
imgui.same_line()
imgui.text(s)
imgui.end_child()
if imgui.button("Add Screenshot(s)"):
r = hide_tk_root()
paths = filedialog.askopenfilenames()
r.destroy()
for p in paths:
if p not in self.screenshots: self.screenshots.append(p)
imgui.end()
# ---- Discussion History
if self.show_windows["Discussion History"]:
exp, self.show_windows["Discussion History"] = imgui.begin("Discussion History", self.show_windows["Discussion History"])
if exp:
if imgui.collapsing_header("Discussions", imgui.TreeNodeFlags_.default_open):
names = self._get_discussion_names()
if imgui.begin_combo("##disc_sel", self.active_discussion):
for name in names:
is_selected = (name == self.active_discussion)
if imgui.selectable(name, is_selected)[0]:
self._switch_discussion(name)
if is_selected:
imgui.set_item_default_focus()
imgui.end_combo()
disc_sec = self.project.get("discussion", {})
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
git_commit = disc_data.get("git_commit", "")
last_updated = disc_data.get("last_updated", "")
imgui.text_colored(C_LBL, "commit:")
imgui.same_line()
imgui.text_colored(C_IN if git_commit else C_LBL, git_commit[:12] if git_commit else "(none)")
imgui.same_line()
if imgui.button("Update Commit"):
git_dir = self.ui_project_git_dir
if git_dir:
cmt = project_manager.get_git_commit(git_dir)
if cmt:
disc_data["git_commit"] = cmt
disc_data["last_updated"] = project_manager.now_ts()
self.ai_status = f"commit: {cmt[:12]}"
imgui.text_colored(C_LBL, "updated:")
imgui.same_line()
imgui.text_colored(C_SUB, last_updated if last_updated else "(never)")
ch, self.ui_disc_new_name_input = imgui.input_text("##new_disc", self.ui_disc_new_name_input)
imgui.same_line()
if imgui.button("Create"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._create_discussion(nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Rename"):
nm = self.ui_disc_new_name_input.strip()
if nm: self._rename_discussion(self.active_discussion, nm); self.ui_disc_new_name_input = ""
imgui.same_line()
if imgui.button("Delete"):
self._delete_discussion(self.active_discussion)
imgui.separator()
if imgui.button("+ Entry"):
self.disc_entries.append({"role": self.disc_roles[0] if self.disc_roles else "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
if imgui.button("-All"):
for e in self.disc_entries: e["collapsed"] = True
imgui.same_line()
if imgui.button("+All"):
for e in self.disc_entries: e["collapsed"] = False
imgui.same_line()
if imgui.button("Clear All"):
self.disc_entries.clear()
imgui.same_line()
if imgui.button("Save"):
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "discussion saved"
ch, self.ui_auto_add_history = imgui.checkbox("Auto-add message & response to history", self.ui_auto_add_history)
imgui.separator()
if imgui.collapsing_header("Roles"):
imgui.begin_child("roles_scroll", imgui.ImVec2(0, 100), True)
for i, r in enumerate(self.disc_roles):
if imgui.button(f"x##r{i}"):
self.disc_roles.pop(i)
break
imgui.same_line()
imgui.text(r)
imgui.end_child()
ch, self.ui_disc_new_role_input = imgui.input_text("##new_role", self.ui_disc_new_role_input)
imgui.same_line()
if imgui.button("Add"):
r = self.ui_disc_new_role_input.strip()
if r and r not in self.disc_roles:
self.disc_roles.append(r)
self.ui_disc_new_role_input = ""
imgui.separator()
imgui.begin_child("disc_scroll", imgui.ImVec2(0, 0), False)
for i, entry in enumerate(self.disc_entries):
imgui.push_id(str(i))
collapsed = entry.get("collapsed", False)
read_mode = entry.get("read_mode", False)
if imgui.button("+" if collapsed else "-"):
entry["collapsed"] = not collapsed
imgui.same_line()
imgui.set_next_item_width(120)
if imgui.begin_combo("##role", entry["role"]):
for r in self.disc_roles:
if imgui.selectable(r, r == entry["role"])[0]:
entry["role"] = r
imgui.end_combo()
if not collapsed:
imgui.same_line()
if imgui.button("[Edit]" if read_mode else "[Read]"):
entry["read_mode"] = not read_mode
ts_str = entry.get("ts", "")
if ts_str:
imgui.same_line()
imgui.text_colored(vec4(120, 120, 100), ts_str)
if collapsed:
imgui.same_line()
if imgui.button("Ins"):
self.disc_entries.insert(i, {"role": "User", "content": "", "collapsed": False, "ts": project_manager.now_ts()})
imgui.same_line()
self._render_text_viewer(f"Entry #{i+1}", entry["content"])
imgui.same_line()
if imgui.button("Del"):
self.disc_entries.pop(i)
imgui.pop_id()
break
imgui.same_line()
preview = entry["content"].replace("\n", " ")[:60]
if len(entry["content"]) > 60: preview += "..."
imgui.text_colored(vec4(160, 160, 150), preview)
if not collapsed:
if read_mode:
imgui.begin_child("read_content", imgui.ImVec2(0, 150), True)
if self.ui_word_wrap: imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(entry["content"])
if self.ui_word_wrap: imgui.pop_text_wrap_pos()
imgui.end_child()
else:
ch, entry["content"] = imgui.input_text_multiline("##content", entry["content"], imgui.ImVec2(-1, 150))
imgui.separator()
imgui.pop_id()
imgui.end_child()
imgui.end()
# ---- Provider
if self.show_windows["Provider"]:
exp, self.show_windows["Provider"] = imgui.begin("Provider", self.show_windows["Provider"])
if exp:
imgui.text("Provider")
if imgui.begin_combo("##prov", self.current_provider):
for p in PROVIDERS:
if imgui.selectable(p, p == self.current_provider)[0]:
self.current_provider = p
ai_client.reset_session()
ai_client.set_provider(p, self.current_model)
self.available_models = []
self._fetch_models(p)
imgui.end_combo()
imgui.separator()
imgui.text("Model")
imgui.same_line()
if imgui.button("Fetch Models"):
self._fetch_models(self.current_provider)
if imgui.begin_list_box("##models", imgui.ImVec2(-1, 120)):
for m in self.available_models:
if imgui.selectable(m, m == self.current_model)[0]:
self.current_model = m
ai_client.reset_session()
ai_client.set_provider(self.current_provider, m)
imgui.end_list_box()
imgui.end()
# ---- Message
if self.show_windows["Message"]:
exp, self.show_windows["Message"] = imgui.begin("Message", self.show_windows["Message"])
if exp:
ch, self.ui_ai_input = imgui.input_text_multiline("##ai_in", self.ui_ai_input, imgui.ImVec2(-1, -40))
imgui.separator()
if imgui.button("Gen + Send"):
if not (self.send_thread and self.send_thread.is_alive()):
try:
md, path, file_items = self._do_generate()
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
self.ai_status = f"generate error: {e}"
else:
self.ai_status = "sending..."
user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
def do_send():
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "User", "content": user_msg, "collapsed": False, "ts": project_manager.now_ts()})
try:
resp = ai_client.send(self.last_md, user_msg, base_dir, self.last_file_items)
self.ai_response = resp
self.ai_status = "done"
self._trigger_blink = True
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "AI", "content": resp, "collapsed": False, "ts": project_manager.now_ts()})
except ProviderError as e:
self.ai_response = e.ui_message()
self.ai_status = "error"
self._trigger_blink = True
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "Vendor API", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
except Exception as e:
self.ai_response = f"ERROR: {e}"
self.ai_status = "error"
self._trigger_blink = True
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({"role": "System", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
self.send_thread = threading.Thread(target=do_send, daemon=True)
self.send_thread.start()
imgui.same_line()
if imgui.button("MD Only"):
try:
md, path, _ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
except Exception as e:
self.ai_status = f"error: {e}"
imgui.same_line()
if imgui.button("Reset"):
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.ai_status = "session reset"
self.ai_response = ""
imgui.same_line()
if imgui.button("-> History"):
if self.ui_ai_input:
self.disc_entries.append({"role": "User", "content": self.ui_ai_input, "collapsed": False, "ts": project_manager.now_ts()})
imgui.end()
# ---- Response
if self.show_windows["Response"]:
if self._trigger_blink:
self._trigger_blink = False
self._is_blinking = True
self._blink_start_time = time.time()
imgui.set_window_focus_str("Response")
if self._is_blinking:
elapsed = time.time() - self._blink_start_time
if elapsed > 1.5:
self._is_blinking = False
else:
val = math.sin(elapsed * 8 * math.pi)
alpha = 50/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 255, 0, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 255, 0, alpha))
exp, self.show_windows["Response"] = imgui.begin("Response", self.show_windows["Response"])
if exp:
if self.ui_word_wrap:
imgui.begin_child("resp_wrap", imgui.ImVec2(-1, -40), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ai_response)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##ai_out", self.ai_response, imgui.ImVec2(-1, -40), imgui.InputTextFlags_.read_only)
imgui.separator()
if imgui.button("-> History"):
if self.ai_response:
self.disc_entries.append({"role": "AI", "content": self.ai_response, "collapsed": False, "ts": project_manager.now_ts()})
if self._is_blinking:
imgui.pop_style_color(2)
imgui.end()
# ---- Tool Calls
if self.show_windows["Tool Calls"]:
exp, self.show_windows["Tool Calls"] = imgui.begin("Tool Calls", self.show_windows["Tool Calls"])
if exp:
imgui.text("Tool call history")
imgui.same_line()
if imgui.button("Clear##tc"):
self._tool_log.clear()
imgui.separator()
imgui.begin_child("tc_scroll")
for i, (script, result) in enumerate(self._tool_log, 1):
first_line = script.strip().splitlines()[0][:80] if script.strip() else "(empty)"
imgui.text_colored(C_KEY, f"Call #{i}: {first_line}")
imgui.same_line()
self._render_text_viewer(f"Call Script #{i}", script)
imgui.same_line()
self._render_text_viewer(f"Call Output #{i}", result)
if self.ui_word_wrap:
imgui.begin_child(f"tc_wrap_{i}", imgui.ImVec2(-1, 72), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(result)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline(f"##tc_res_{i}", result, imgui.ImVec2(-1, 72), imgui.InputTextFlags_.read_only)
imgui.separator()
imgui.end_child()
imgui.end()
# ---- Comms History
if self.show_windows["Comms History"]:
exp, self.show_windows["Comms History"] = imgui.begin("Comms History", self.show_windows["Comms History"])
if exp:
imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}")
imgui.same_line()
if imgui.button("Clear##comms"):
ai_client.clear_comms_log()
self._comms_log.clear()
imgui.separator()
imgui.text_colored(C_OUT, "OUT")
imgui.same_line()
imgui.text_colored(C_REQ, "request")
imgui.same_line()
imgui.text_colored(C_TC, "tool_call")
imgui.same_line()
imgui.text(" ")
imgui.same_line()
imgui.text_colored(C_IN, "IN")
imgui.same_line()
imgui.text_colored(C_RES, "response")
imgui.same_line()
imgui.text_colored(C_TR, "tool_result")
imgui.separator()
imgui.begin_child("comms_scroll", imgui.ImVec2(0, 0), False, imgui.WindowFlags_.horizontal_scrollbar)
for idx, entry in enumerate(self._comms_log, 1):
imgui.push_id(f"comms_{idx}")
d = entry["direction"]
k = entry["kind"]
imgui.text_colored(vec4(160, 160, 160), f"#{idx}")
imgui.same_line()
imgui.text_colored(vec4(160, 160, 160), entry["ts"])
imgui.same_line()
imgui.text_colored(DIR_COLORS.get(d, C_VAL), d)
imgui.same_line()
imgui.text_colored(KIND_COLORS.get(k, C_VAL), k)
imgui.same_line()
imgui.text_colored(C_LBL, f"{entry['provider']}/{entry['model']}")
payload = entry["payload"]
if k == "request":
self._render_heavy_text("message", payload.get("message", ""))
elif k == "response":
imgui.text_colored(C_LBL, "round:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload.get("round", "")))
imgui.text_colored(C_LBL, "stop_reason:")
imgui.same_line()
imgui.text_colored(vec4(255, 200, 120), str(payload.get("stop_reason", "")))
text = payload.get("text", "")
if text:
self._render_heavy_text("text", text)
imgui.text_colored(C_LBL, "tool_calls:")
tcs = payload.get("tool_calls", [])
if not tcs:
imgui.text_colored(C_VAL, " (none)")
for i, tc in enumerate(tcs):
imgui.text_colored(C_KEY, f" call[{i}] {tc.get('name', '?')}")
if "id" in tc:
imgui.text_colored(C_LBL, " id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(tc["id"]))
args = tc.get("args") or tc.get("input") or {}
if isinstance(args, dict):
for ak, av in args.items():
self._render_heavy_text(f" {ak}", str(av))
elif args:
self._render_heavy_text(" args", str(args))
usage = payload.get("usage")
if usage:
imgui.text_colored(C_SUB, "usage:")
for uk, uv in usage.items():
imgui.text_colored(C_LBL, f" {uk.replace('_', ' ')}:")
imgui.same_line()
imgui.text_colored(C_NUM, str(uv))
elif k == "tool_call":
imgui.text_colored(C_LBL, "name:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload.get("name", "")))
if "id" in payload:
imgui.text_colored(C_LBL, "id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
if "script" in payload:
self._render_heavy_text("script", payload.get("script", ""))
elif "args" in payload:
args = payload["args"]
if isinstance(args, dict):
for ak, av in args.items():
self._render_heavy_text(ak, str(av))
else:
self._render_heavy_text("args", str(args))
elif k == "tool_result":
imgui.text_colored(C_LBL, "name:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload.get("name", "")))
if "id" in payload:
imgui.text_colored(C_LBL, "id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(payload["id"]))
self._render_heavy_text("output", payload.get("output", ""))
elif k == "tool_result_send":
for i, r in enumerate(payload.get("results", [])):
imgui.text_colored(C_KEY, f"result[{i}]")
imgui.text_colored(C_LBL, " tool_use_id:")
imgui.same_line()
imgui.text_colored(C_VAL, str(r.get("tool_use_id", "")))
self._render_heavy_text(" content", str(r.get("content", "")))
else:
for key, val in payload.items():
vstr = json.dumps(val, ensure_ascii=False, indent=2) if isinstance(val, (dict, list)) else str(val)
if key in HEAVY_KEYS:
self._render_heavy_text(key, vstr)
else:
imgui.text_colored(C_LBL, f"{key}:")
imgui.same_line()
imgui.text_colored(C_VAL, vstr)
imgui.separator()
imgui.pop_id()
imgui.end_child()
imgui.end()
# ---- System Prompts
if self.show_windows["System Prompts"]:
exp, self.show_windows["System Prompts"] = imgui.begin("System Prompts", self.show_windows["System Prompts"])
if exp:
imgui.text("Global System Prompt (all projects)")
ch, self.ui_global_system_prompt = imgui.input_text_multiline("##gsp", self.ui_global_system_prompt, imgui.ImVec2(-1, 100))
imgui.separator()
imgui.text("Project System Prompt")
ch, self.ui_project_system_prompt = imgui.input_text_multiline("##psp", self.ui_project_system_prompt, imgui.ImVec2(-1, 100))
imgui.end()
# ---- Theme
if self.show_windows["Theme"]:
exp, self.show_windows["Theme"] = imgui.begin("Theme", self.show_windows["Theme"])
if exp:
imgui.text("Palette")
cp = theme.get_current_palette()
if imgui.begin_combo("##pal", cp):
for p in theme.get_palette_names():
if imgui.selectable(p, p == cp)[0]:
theme.apply(p)
imgui.end_combo()
imgui.separator()
imgui.text("Font")
imgui.push_item_width(-150)
ch, path = imgui.input_text("##fontp", theme.get_current_font_path())
imgui.pop_item_width()
if ch: theme._current_font_path = path
imgui.same_line()
if imgui.button("Browse##font"):
r = hide_tk_root()
p = filedialog.askopenfilename(filetypes=[("Fonts", "*.ttf *.otf"), ("All", "*.*")])
r.destroy()
if p: theme._current_font_path = p
imgui.text("Size (px)")
imgui.same_line()
imgui.push_item_width(100)
ch, size = imgui.input_float("##fonts", theme.get_current_font_size(), 1.0, 1.0, "%.0f")
if ch: theme._current_font_size = size
imgui.pop_item_width()
imgui.same_line()
if imgui.button("Apply Font (Requires Restart)"):
self._flush_to_config()
save_config(self.config)
self.ai_status = "Font settings saved. Restart required."
imgui.separator()
imgui.text("UI Scale (DPI)")
ch, scale = imgui.slider_float("##scale", theme.get_current_scale(), 0.5, 3.0, "%.2f")
if ch: theme.set_scale(scale)
imgui.end()
# ---- Modals / Popups
with self._pending_dialog_lock:
dlg = self._pending_dialog
if dlg:
if not self._pending_dialog_open:
imgui.open_popup("Approve PowerShell Command")
self._pending_dialog_open = True
else:
self._pending_dialog_open = False
if imgui.begin_popup_modal("Approve PowerShell Command", None, imgui.WindowFlags_.always_auto_resize)[0]:
if dlg:
imgui.text("The AI wants to run the following PowerShell script:")
imgui.text_colored(vec4(200, 200, 100), f"base_dir: {dlg._base_dir}")
imgui.separator()
if imgui.button("[+ Maximize]##confirm"):
self.show_text_viewer = True
self.text_viewer_title = "Confirm Script"
self.text_viewer_content = dlg._script
ch, dlg._script = imgui.input_text_multiline("##confirm_script", dlg._script, imgui.ImVec2(-1, 300))
imgui.separator()
if imgui.button("Approve & Run", imgui.ImVec2(120, 0)):
dlg._approved = True
dlg._event.set()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Reject", imgui.ImVec2(120, 0)):
dlg._approved = False
dlg._event.set()
with self._pending_dialog_lock:
self._pending_dialog = None
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output:
if self._trigger_script_blink:
self._trigger_script_blink = False
self._is_script_blinking = True
self._script_blink_start_time = time.time()
imgui.set_window_focus_str("Last Script Output")
if self._is_script_blinking:
elapsed = time.time() - self._script_blink_start_time
if elapsed > 1.5:
self._is_script_blinking = False
else:
val = math.sin(elapsed * 8 * math.pi)
alpha = 60/255 if val > 0 else 0
imgui.push_style_color(imgui.Col_.frame_bg, vec4(0, 100, 255, alpha))
imgui.push_style_color(imgui.Col_.child_bg, vec4(0, 100, 255, alpha))
imgui.set_next_window_size(imgui.ImVec2(800, 600), imgui.Cond_.first_use_ever)
expanded, self.show_script_output = imgui.begin("Last Script Output", self.show_script_output)
if expanded:
imgui.text("Script:")
imgui.same_line()
self._render_text_viewer("Last Script", self.ui_last_script_text)
if self.ui_word_wrap:
imgui.begin_child("lso_s_wrap", imgui.ImVec2(-1, 200), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_text)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_s", self.ui_last_script_text, imgui.ImVec2(-1, 200), imgui.InputTextFlags_.read_only)
imgui.separator()
imgui.text("Output:")
imgui.same_line()
self._render_text_viewer("Last Output", self.ui_last_script_output)
if self.ui_word_wrap:
imgui.begin_child("lso_o_wrap", imgui.ImVec2(-1, -1), True)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.ui_last_script_output)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##lso_o", self.ui_last_script_output, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
if self._is_script_blinking:
imgui.pop_style_color(2)
imgui.end()
if self.show_text_viewer:
imgui.set_next_window_size(imgui.ImVec2(900, 700), imgui.Cond_.first_use_ever)
expanded, self.show_text_viewer = imgui.begin(f"Text Viewer - {self.text_viewer_title}", self.show_text_viewer)
if expanded:
if self.ui_word_wrap:
imgui.begin_child("tv_wrap", imgui.ImVec2(-1, -1), False)
imgui.push_text_wrap_pos(imgui.get_content_region_avail().x)
imgui.text(self.text_viewer_content)
imgui.pop_text_wrap_pos()
imgui.end_child()
else:
imgui.input_text_multiline("##tv_c", self.text_viewer_content, imgui.ImVec2(-1, -1), imgui.InputTextFlags_.read_only)
imgui.end()
def _load_fonts(self):
font_path, font_size = theme.get_font_loading_params()
if font_path and Path(font_path).exists():
hello_imgui.load_font(font_path, font_size)
def run(self):
theme.load_from_config(self.config)
self.runner_params = hello_imgui.RunnerParams()
self.runner_params.app_window_params.window_title = "manual slop"
self.runner_params.app_window_params.window_geometry.size = (1680, 1200)
self.runner_params.imgui_window_params.enable_viewports = True
self.runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space
self.runner_params.callbacks.show_gui = self._gui_func
self.runner_params.callbacks.load_additional_fonts = self._load_fonts
self._fetch_models(self.current_provider)
immapp.run(self.runner_params)
# On exit
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
session_logger.close_session()
def main():
app = App()
app.run()
if __name__ == "__main__":
main()