Files
manual_slop/gui.py
2026-02-21 15:28:09 -05:00

630 lines
21 KiB
Python

import dearpygui.dearpygui as dpg
import tomllib
import tomli_w
import threading
from pathlib import Path
from tkinter import filedialog, Tk
import aggregate
import ai_client
from ai_client import ProviderError
import shell_runner
CONFIG_PATH = Path("config.toml")
PROVIDERS = ["gemini", "anthropic"]
def load_config() -> dict:
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
def save_config(config: dict):
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
def hide_tk_root() -> Tk:
root = Tk()
root.withdraw()
root.wm_attributes("-topmost", True)
return root
class ConfirmDialog:
"""
Modal confirmation window for a proposed PowerShell script.
Background thread calls wait(), which blocks on a threading.Event.
Main render loop detects _pending_dialog and calls show() on the next frame.
User clicks Approve or Reject, which sets the event and unblocks the thread.
"""
_next_id = 0
def __init__(self, script: str, base_dir: str):
ConfirmDialog._next_id += 1
self._uid = ConfirmDialog._next_id
self._tag = f"confirm_dlg_{self._uid}"
self._script = script
self._base_dir = base_dir
self._event = threading.Event()
self._approved = False
def show(self):
"""Called from main thread only."""
w, h = 700, 440
vp_w = dpg.get_viewport_width()
vp_h = dpg.get_viewport_height()
px = max(0, (vp_w - w) // 2)
py = max(0, (vp_h - h) // 2)
with dpg.window(
label=f"Approve PowerShell Command #{self._uid}",
tag=self._tag,
modal=True,
no_close=True,
pos=(px, py),
width=w,
height=h,
):
dpg.add_text("The AI wants to run the following PowerShell script:")
dpg.add_text(f"base_dir: {self._base_dir}", color=(200, 200, 100))
dpg.add_separator()
dpg.add_input_text(
tag=f"{self._tag}_script",
default_value=self._script,
multiline=True,
width=-1,
height=-72,
readonly=False,
)
dpg.add_separator()
with dpg.group(horizontal=True):
dpg.add_button(label="Approve & Run", callback=self._cb_approve)
dpg.add_button(label="Reject", callback=self._cb_reject)
def _cb_approve(self):
self._script = dpg.get_value(f"{self._tag}_script")
self._approved = True
self._event.set()
dpg.delete_item(self._tag)
def _cb_reject(self):
self._approved = False
self._event.set()
dpg.delete_item(self._tag)
def wait(self) -> tuple[bool, str]:
"""Called from background thread. Blocks until user acts."""
self._event.wait()
return self._approved, self._script
class App:
def __init__(self):
self.config = load_config()
self.files: list[str] = list(self.config["files"].get("paths", []))
self.screenshots: list[str] = list(
self.config.get("screenshots", {}).get("paths", [])
)
self.history: list[str] = list(
self.config.get("discussion", {}).get("history", [])
)
ai_cfg = self.config.get("ai", {})
self.current_provider: str = ai_cfg.get("provider", "gemini")
self.current_model: str = ai_cfg.get("model", "gemini-2.0-flash")
self.available_models: list[str] = []
self.ai_status = "idle"
self.ai_response = ""
self.last_md = ""
self.last_md_path: Path | None = None
self.send_thread: threading.Thread | None = None
self.models_thread: threading.Thread | None = None
self._pending_dialog: ConfirmDialog | None = None
self._pending_dialog_lock = threading.Lock()
self._tool_log: list[tuple[str, str]] = []
ai_client.set_provider(self.current_provider, self.current_model)
ai_client.confirm_and_run_callback = self._confirm_and_run
# ---------------------------------------------------------------- tool execution
def _confirm_and_run(self, script: str, base_dir: str) -> str | None:
dialog = ConfirmDialog(script, base_dir)
with self._pending_dialog_lock:
self._pending_dialog = dialog
approved, final_script = dialog.wait()
if not approved:
self._append_tool_log(final_script, "REJECTED by user")
return None
self._update_status("running powershell...")
output = shell_runner.run_powershell(final_script, base_dir)
self._append_tool_log(final_script, output)
self._update_status("powershell done, awaiting AI...")
return output
def _append_tool_log(self, script: str, result: str):
self._tool_log.append((script, result))
self._rebuild_tool_log()
def _rebuild_tool_log(self):
if not dpg.does_item_exist("tool_log_scroll"):
return
dpg.delete_item("tool_log_scroll", children_only=True)
for i, (script, result) in enumerate(self._tool_log, 1):
with dpg.group(parent="tool_log_scroll"):
dpg.add_text(f"Call #{i}", color=(140, 200, 255))
dpg.add_input_text(
default_value=script,
multiline=True,
readonly=True,
width=-1,
height=72,
)
dpg.add_text("Result:", color=(180, 255, 180))
dpg.add_input_text(
default_value=result,
multiline=True,
readonly=True,
width=-1,
height=72,
)
dpg.add_separator()
# ---------------------------------------------------------------- helpers
def _flush_to_config(self):
self.config["output"]["namespace"] = dpg.get_value("namespace")
self.config["output"]["output_dir"] = dpg.get_value("output_dir")
self.config["files"]["base_dir"] = dpg.get_value("files_base_dir")
if "screenshots" not in self.config:
self.config["screenshots"] = {}
self.config["screenshots"]["base_dir"] = dpg.get_value("shots_base_dir")
self.config["files"]["paths"] = self.files
self.config["screenshots"]["paths"] = self.screenshots
raw = dpg.get_value("discussion_box")
self.history = [s.strip() for s in raw.split("---") if s.strip()]
self.config["discussion"] = {"history": self.history}
self.config["ai"] = {
"provider": self.current_provider,
"model": self.current_model,
}
def _do_generate(self) -> tuple[str, Path]:
self._flush_to_config()
save_config(self.config)
return aggregate.run(self.config)
def _update_status(self, status: str):
self.ai_status = status
if dpg.does_item_exist("ai_status"):
dpg.set_value("ai_status", f"Status: {status}")
def _update_response(self, text: str):
self.ai_response = text
if dpg.does_item_exist("ai_response"):
dpg.set_value("ai_response", text)
def _rebuild_files_list(self):
if not dpg.does_item_exist("files_scroll"):
return
dpg.delete_item("files_scroll", children_only=True)
for i, f in enumerate(self.files):
with dpg.group(horizontal=True, parent="files_scroll"):
dpg.add_button(
label="x", width=24, callback=self._make_remove_file_cb(i)
)
dpg.add_text(f)
def _rebuild_shots_list(self):
if not dpg.does_item_exist("shots_scroll"):
return
dpg.delete_item("shots_scroll", children_only=True)
for i, s in enumerate(self.screenshots):
with dpg.group(horizontal=True, parent="shots_scroll"):
dpg.add_button(
label="x", width=24, callback=self._make_remove_shot_cb(i)
)
dpg.add_text(s)
def _rebuild_models_list(self):
if not dpg.does_item_exist("model_listbox"):
return
dpg.configure_item("model_listbox", items=self.available_models)
if self.current_model in self.available_models:
dpg.set_value("model_listbox", self.current_model)
elif self.available_models:
self.current_model = self.available_models[0]
dpg.set_value("model_listbox", self.current_model)
ai_client.set_provider(self.current_provider, self.current_model)
def _make_remove_file_cb(self, idx: int):
def cb():
if idx < len(self.files):
self.files.pop(idx)
self._rebuild_files_list()
return cb
def _make_remove_shot_cb(self, idx: int):
def cb():
if idx < len(self.screenshots):
self.screenshots.pop(idx)
self._rebuild_shots_list()
return cb
def _fetch_models(self, provider: str):
self._update_status("fetching models...")
def do_fetch():
try:
models = ai_client.list_models(provider)
self.available_models = models
self._rebuild_models_list()
self._update_status(f"models loaded: {len(models)}")
except Exception as e:
self._update_status(f"model fetch error: {e}")
self.models_thread = threading.Thread(target=do_fetch, daemon=True)
self.models_thread.start()
# ---------------------------------------------------------------- callbacks
def cb_browse_output(self):
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Output Dir")
root.destroy()
if d:
dpg.set_value("output_dir", d)
def cb_save_config(self):
self._flush_to_config()
save_config(self.config)
self._update_status("config saved")
def cb_browse_files_base(self):
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Files Base Dir")
root.destroy()
if d:
dpg.set_value("files_base_dir", d)
def cb_add_files(self):
root = hide_tk_root()
paths = filedialog.askopenfilenames(title="Select Files")
root.destroy()
for p in paths:
if p not in self.files:
self.files.append(p)
self._rebuild_files_list()
def cb_add_wildcard(self):
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Dir for Wildcard")
root.destroy()
if d:
self.files.append(str(Path(d) / "**" / "*"))
self._rebuild_files_list()
def cb_browse_shots_base(self):
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Screenshots Base Dir")
root.destroy()
if d:
dpg.set_value("shots_base_dir", d)
def cb_add_shots(self):
root = hide_tk_root()
paths = filedialog.askopenfilenames(
title="Select Screenshots",
filetypes=[
("Images", "*.png *.jpg *.jpeg *.gif *.bmp *.webp"),
("All", "*.*"),
],
)
root.destroy()
for p in paths:
if p not in self.screenshots:
self.screenshots.append(p)
self._rebuild_shots_list()
def cb_add_excerpt(self):
current = dpg.get_value("discussion_box")
dpg.set_value("discussion_box", current + "\n---\n")
def cb_clear_discussion(self):
dpg.set_value("discussion_box", "")
def cb_save_discussion(self):
self._flush_to_config()
save_config(self.config)
self._update_status("discussion saved")
def cb_md_only(self):
try:
md, path = self._do_generate()
self.last_md = md
self.last_md_path = path
self._update_status(f"md written: {path.name}")
except Exception as e:
self._update_status(f"error: {e}")
def cb_reset_session(self):
ai_client.reset_session()
self._tool_log.clear()
self._rebuild_tool_log()
self._update_status("session reset")
self._update_response("")
def cb_generate_send(self):
if self.send_thread and self.send_thread.is_alive():
return
try:
md, path = self._do_generate()
self.last_md = md
self.last_md_path = path
except Exception as e:
self._update_status(f"generate error: {e}")
return
self._update_status("sending...")
user_msg = dpg.get_value("ai_input")
base_dir = dpg.get_value("files_base_dir")
def do_send():
try:
response = ai_client.send(self.last_md, user_msg, base_dir)
self._update_response(response)
self._update_status("done")
except Exception as e:
self._update_response(f"ERROR: {e}")
self._update_status("error")
self.send_thread = threading.Thread(target=do_send, daemon=True)
self.send_thread.start()
def cb_provider_changed(self, sender, app_data):
self.current_provider = app_data
ai_client.reset_session()
ai_client.set_provider(self.current_provider, self.current_model)
self.available_models = []
self._rebuild_models_list()
self._fetch_models(self.current_provider)
def cb_model_changed(self, sender, app_data):
if app_data:
self.current_model = app_data
ai_client.reset_session()
ai_client.set_provider(self.current_provider, self.current_model)
self._update_status(f"model set: {self.current_model}")
def cb_fetch_models(self):
self._fetch_models(self.current_provider)
def cb_clear_tool_log(self):
self._tool_log.clear()
self._rebuild_tool_log()
# ---------------------------------------------------------------- build ui
def _build_ui(self):
with dpg.window(
label="Config",
tag="win_config",
pos=(8, 8),
width=400,
height=200,
no_close=True,
):
dpg.add_text("Namespace")
dpg.add_input_text(
tag="namespace",
default_value=self.config["output"]["namespace"],
width=-1,
)
dpg.add_text("Output Dir")
dpg.add_input_text(
tag="output_dir",
default_value=self.config["output"]["output_dir"],
width=-1,
)
with dpg.group(horizontal=True):
dpg.add_button(label="Browse Output Dir", callback=self.cb_browse_output)
dpg.add_button(label="Save Config", callback=self.cb_save_config)
with dpg.window(
label="Files",
tag="win_files",
pos=(8, 216),
width=400,
height=500,
no_close=True,
):
dpg.add_text("Base Dir")
with dpg.group(horizontal=True):
dpg.add_input_text(
tag="files_base_dir",
default_value=self.config["files"]["base_dir"],
width=-220,
)
dpg.add_button(
label="Browse##filesbase", callback=self.cb_browse_files_base
)
dpg.add_separator()
dpg.add_text("Paths")
with dpg.child_window(tag="files_scroll", height=-64, border=True):
pass
self._rebuild_files_list()
dpg.add_separator()
with dpg.group(horizontal=True):
dpg.add_button(label="Add File(s)", callback=self.cb_add_files)
dpg.add_button(label="Add Wildcard", callback=self.cb_add_wildcard)
with dpg.window(
label="Screenshots",
tag="win_screenshots",
pos=(416, 8),
width=400,
height=500,
no_close=True,
):
dpg.add_text("Base Dir")
with dpg.group(horizontal=True):
dpg.add_input_text(
tag="shots_base_dir",
default_value=self.config.get("screenshots", {}).get("base_dir", "."),
width=-220,
)
dpg.add_button(
label="Browse##shotsbase", callback=self.cb_browse_shots_base
)
dpg.add_separator()
dpg.add_text("Paths")
with dpg.child_window(tag="shots_scroll", height=-48, border=True):
pass
self._rebuild_shots_list()
dpg.add_separator()
dpg.add_button(label="Add Screenshot(s)", callback=self.cb_add_shots)
with dpg.window(
label="Discussion History",
tag="win_discussion",
pos=(824, 8),
width=400,
height=500,
no_close=True,
):
dpg.add_input_text(
tag="discussion_box",
default_value="\n---\n".join(self.history),
multiline=True,
width=-1,
height=-64,
)
dpg.add_separator()
with dpg.group(horizontal=True):
dpg.add_button(label="Add Separator", callback=self.cb_add_excerpt)
dpg.add_button(label="Clear", callback=self.cb_clear_discussion)
dpg.add_button(label="Save", callback=self.cb_save_discussion)
with dpg.window(
label="Provider",
tag="win_provider",
pos=(1232, 8),
width=420,
height=280,
no_close=True,
):
dpg.add_text("Provider")
dpg.add_combo(
tag="provider_combo",
items=PROVIDERS,
default_value=self.current_provider,
width=-1,
callback=self.cb_provider_changed,
)
dpg.add_separator()
with dpg.group(horizontal=True):
dpg.add_text("Model")
dpg.add_button(label="Fetch Models", callback=self.cb_fetch_models)
dpg.add_listbox(
tag="model_listbox",
items=self.available_models,
default_value=self.current_model,
width=-1,
num_items=6,
callback=self.cb_model_changed,
)
dpg.add_separator()
dpg.add_text("Status: idle", tag="ai_status")
with dpg.window(
label="Message",
tag="win_message",
pos=(1232, 296),
width=420,
height=280,
no_close=True,
):
dpg.add_input_text(
tag="ai_input",
multiline=True,
width=-1,
height=-64,
)
dpg.add_separator()
with dpg.group(horizontal=True):
dpg.add_button(label="Gen + Send", callback=self.cb_generate_send)
dpg.add_button(label="MD Only", callback=self.cb_md_only)
dpg.add_button(label="Reset", callback=self.cb_reset_session)
with dpg.window(
label="Response",
tag="win_response",
pos=(1232, 584),
width=420,
height=300,
no_close=True,
):
dpg.add_input_text(
tag="ai_response",
multiline=True,
readonly=True,
width=-1,
height=-1,
)
with dpg.window(
label="Tool Calls",
tag="win_tool_log",
pos=(1232, 892),
width=420,
height=300,
no_close=True,
):
with dpg.group(horizontal=True):
dpg.add_text("Tool call history")
dpg.add_button(label="Clear", callback=self.cb_clear_tool_log)
dpg.add_separator()
with dpg.child_window(tag="tool_log_scroll", height=-1, border=False):
pass
def run(self):
dpg.create_context()
dpg.configure_app(docking=True, docking_space=True, init_file="dpg_layout.ini")
dpg.create_viewport(title="manual slop", width=1680, height=1200)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.maximize_viewport()
self._build_ui()
self._fetch_models(self.current_provider)
while dpg.is_dearpygui_running():
# Show any pending confirmation dialog on the main thread
with self._pending_dialog_lock:
dialog = self._pending_dialog
self._pending_dialog = None
if dialog is not None:
dialog.show()
dpg.render_dearpygui_frame()
dpg.save_init_file("dpg_layout.ini")
dpg.destroy_context()
def main():
app = App()
app.run()
if __name__ == "__main__":
main()