import dearpygui.dearpygui as dpg import tomllib import tomli_w import threading from pathlib import Path from tkinter import filedialog, Tk import aggregate import ai_client from ai_client import ProviderError import shell_runner CONFIG_PATH = Path("config.toml") PROVIDERS = ["gemini", "anthropic"] def load_config() -> dict: with open(CONFIG_PATH, "rb") as f: return tomllib.load(f) def save_config(config: dict): with open(CONFIG_PATH, "wb") as f: tomli_w.dump(config, f) def hide_tk_root() -> Tk: root = Tk() root.withdraw() root.wm_attributes("-topmost", True) return root class ConfirmDialog: """ Modal confirmation window for a proposed PowerShell script. Background thread calls wait(), which blocks on a threading.Event. Main render loop detects _pending_dialog and calls show() on the next frame. User clicks Approve or Reject, which sets the event and unblocks the thread. """ _next_id = 0 def __init__(self, script: str, base_dir: str): ConfirmDialog._next_id += 1 self._uid = ConfirmDialog._next_id self._tag = f"confirm_dlg_{self._uid}" self._script = script self._base_dir = base_dir self._event = threading.Event() self._approved = False def show(self): """Called from main thread only.""" w, h = 700, 440 vp_w = dpg.get_viewport_width() vp_h = dpg.get_viewport_height() px = max(0, (vp_w - w) // 2) py = max(0, (vp_h - h) // 2) with dpg.window( label=f"Approve PowerShell Command #{self._uid}", tag=self._tag, modal=True, no_close=True, pos=(px, py), width=w, height=h, ): dpg.add_text("The AI wants to run the following PowerShell script:") dpg.add_text(f"base_dir: {self._base_dir}", color=(200, 200, 100)) dpg.add_separator() dpg.add_input_text( tag=f"{self._tag}_script", default_value=self._script, multiline=True, width=-1, height=-72, readonly=False, ) dpg.add_separator() with dpg.group(horizontal=True): dpg.add_button(label="Approve & Run", callback=self._cb_approve) dpg.add_button(label="Reject", callback=self._cb_reject) def _cb_approve(self): self._script = dpg.get_value(f"{self._tag}_script") self._approved = True self._event.set() dpg.delete_item(self._tag) def _cb_reject(self): self._approved = False self._event.set() dpg.delete_item(self._tag) def wait(self) -> tuple[bool, str]: """Called from background thread. Blocks until user acts.""" self._event.wait() return self._approved, self._script class App: def __init__(self): self.config = load_config() self.files: list[str] = list(self.config["files"].get("paths", [])) self.screenshots: list[str] = list( self.config.get("screenshots", {}).get("paths", []) ) self.history: list[str] = list( self.config.get("discussion", {}).get("history", []) ) ai_cfg = self.config.get("ai", {}) self.current_provider: str = ai_cfg.get("provider", "gemini") self.current_model: str = ai_cfg.get("model", "gemini-2.0-flash") self.available_models: list[str] = [] self.ai_status = "idle" self.ai_response = "" self.last_md = "" self.last_md_path: Path | None = None self.send_thread: threading.Thread | None = None self.models_thread: threading.Thread | None = None self._pending_dialog: ConfirmDialog | None = None self._pending_dialog_lock = threading.Lock() self._tool_log: list[tuple[str, str]] = [] ai_client.set_provider(self.current_provider, self.current_model) ai_client.confirm_and_run_callback = self._confirm_and_run # ---------------------------------------------------------------- tool execution def _confirm_and_run(self, script: str, base_dir: str) -> str | None: dialog = ConfirmDialog(script, base_dir) with self._pending_dialog_lock: self._pending_dialog = dialog approved, final_script = dialog.wait() if not approved: self._append_tool_log(final_script, "REJECTED by user") return None self._update_status("running powershell...") output = shell_runner.run_powershell(final_script, base_dir) self._append_tool_log(final_script, output) self._update_status("powershell done, awaiting AI...") return output def _append_tool_log(self, script: str, result: str): self._tool_log.append((script, result)) self._rebuild_tool_log() def _rebuild_tool_log(self): if not dpg.does_item_exist("tool_log_scroll"): return dpg.delete_item("tool_log_scroll", children_only=True) for i, (script, result) in enumerate(self._tool_log, 1): with dpg.group(parent="tool_log_scroll"): dpg.add_text(f"Call #{i}", color=(140, 200, 255)) dpg.add_input_text( default_value=script, multiline=True, readonly=True, width=-1, height=72, ) dpg.add_text("Result:", color=(180, 255, 180)) dpg.add_input_text( default_value=result, multiline=True, readonly=True, width=-1, height=72, ) dpg.add_separator() # ---------------------------------------------------------------- helpers def _flush_to_config(self): self.config["output"]["namespace"] = dpg.get_value("namespace") self.config["output"]["output_dir"] = dpg.get_value("output_dir") self.config["files"]["base_dir"] = dpg.get_value("files_base_dir") if "screenshots" not in self.config: self.config["screenshots"] = {} self.config["screenshots"]["base_dir"] = dpg.get_value("shots_base_dir") self.config["files"]["paths"] = self.files self.config["screenshots"]["paths"] = self.screenshots raw = dpg.get_value("discussion_box") self.history = [s.strip() for s in raw.split("---") if s.strip()] self.config["discussion"] = {"history": self.history} self.config["ai"] = { "provider": self.current_provider, "model": self.current_model, } def _do_generate(self) -> tuple[str, Path]: self._flush_to_config() save_config(self.config) return aggregate.run(self.config) def _update_status(self, status: str): self.ai_status = status if dpg.does_item_exist("ai_status"): dpg.set_value("ai_status", f"Status: {status}") def _update_response(self, text: str): self.ai_response = text if dpg.does_item_exist("ai_response"): dpg.set_value("ai_response", text) def _rebuild_files_list(self): if not dpg.does_item_exist("files_scroll"): return dpg.delete_item("files_scroll", children_only=True) for i, f in enumerate(self.files): with dpg.group(horizontal=True, parent="files_scroll"): dpg.add_button( label="x", width=24, callback=self._make_remove_file_cb(i) ) dpg.add_text(f) def _rebuild_shots_list(self): if not dpg.does_item_exist("shots_scroll"): return dpg.delete_item("shots_scroll", children_only=True) for i, s in enumerate(self.screenshots): with dpg.group(horizontal=True, parent="shots_scroll"): dpg.add_button( label="x", width=24, callback=self._make_remove_shot_cb(i) ) dpg.add_text(s) def _rebuild_models_list(self): if not dpg.does_item_exist("model_listbox"): return dpg.configure_item("model_listbox", items=self.available_models) if self.current_model in self.available_models: dpg.set_value("model_listbox", self.current_model) elif self.available_models: self.current_model = self.available_models[0] dpg.set_value("model_listbox", self.current_model) ai_client.set_provider(self.current_provider, self.current_model) def _make_remove_file_cb(self, idx: int): def cb(): if idx < len(self.files): self.files.pop(idx) self._rebuild_files_list() return cb def _make_remove_shot_cb(self, idx: int): def cb(): if idx < len(self.screenshots): self.screenshots.pop(idx) self._rebuild_shots_list() return cb def _fetch_models(self, provider: str): self._update_status("fetching models...") def do_fetch(): try: models = ai_client.list_models(provider) self.available_models = models self._rebuild_models_list() self._update_status(f"models loaded: {len(models)}") except Exception as e: self._update_status(f"model fetch error: {e}") self.models_thread = threading.Thread(target=do_fetch, daemon=True) self.models_thread.start() # ---------------------------------------------------------------- callbacks def cb_browse_output(self): root = hide_tk_root() d = filedialog.askdirectory(title="Select Output Dir") root.destroy() if d: dpg.set_value("output_dir", d) def cb_save_config(self): self._flush_to_config() save_config(self.config) self._update_status("config saved") def cb_browse_files_base(self): root = hide_tk_root() d = filedialog.askdirectory(title="Select Files Base Dir") root.destroy() if d: dpg.set_value("files_base_dir", d) def cb_add_files(self): root = hide_tk_root() paths = filedialog.askopenfilenames(title="Select Files") root.destroy() for p in paths: if p not in self.files: self.files.append(p) self._rebuild_files_list() def cb_add_wildcard(self): root = hide_tk_root() d = filedialog.askdirectory(title="Select Dir for Wildcard") root.destroy() if d: self.files.append(str(Path(d) / "**" / "*")) self._rebuild_files_list() def cb_browse_shots_base(self): root = hide_tk_root() d = filedialog.askdirectory(title="Select Screenshots Base Dir") root.destroy() if d: dpg.set_value("shots_base_dir", d) def cb_add_shots(self): root = hide_tk_root() paths = filedialog.askopenfilenames( title="Select Screenshots", filetypes=[ ("Images", "*.png *.jpg *.jpeg *.gif *.bmp *.webp"), ("All", "*.*"), ], ) root.destroy() for p in paths: if p not in self.screenshots: self.screenshots.append(p) self._rebuild_shots_list() def cb_add_excerpt(self): current = dpg.get_value("discussion_box") dpg.set_value("discussion_box", current + "\n---\n") def cb_clear_discussion(self): dpg.set_value("discussion_box", "") def cb_save_discussion(self): self._flush_to_config() save_config(self.config) self._update_status("discussion saved") def cb_md_only(self): try: md, path = self._do_generate() self.last_md = md self.last_md_path = path self._update_status(f"md written: {path.name}") except Exception as e: self._update_status(f"error: {e}") def cb_reset_session(self): ai_client.reset_session() self._tool_log.clear() self._rebuild_tool_log() self._update_status("session reset") self._update_response("") def cb_generate_send(self): if self.send_thread and self.send_thread.is_alive(): return try: md, path = self._do_generate() self.last_md = md self.last_md_path = path except Exception as e: self._update_status(f"generate error: {e}") return self._update_status("sending...") user_msg = dpg.get_value("ai_input") base_dir = dpg.get_value("files_base_dir") def do_send(): try: response = ai_client.send(self.last_md, user_msg, base_dir) self._update_response(response) self._update_status("done") except Exception as e: self._update_response(f"ERROR: {e}") self._update_status("error") self.send_thread = threading.Thread(target=do_send, daemon=True) self.send_thread.start() def cb_provider_changed(self, sender, app_data): self.current_provider = app_data ai_client.reset_session() ai_client.set_provider(self.current_provider, self.current_model) self.available_models = [] self._rebuild_models_list() self._fetch_models(self.current_provider) def cb_model_changed(self, sender, app_data): if app_data: self.current_model = app_data ai_client.reset_session() ai_client.set_provider(self.current_provider, self.current_model) self._update_status(f"model set: {self.current_model}") def cb_fetch_models(self): self._fetch_models(self.current_provider) def cb_clear_tool_log(self): self._tool_log.clear() self._rebuild_tool_log() # ---------------------------------------------------------------- build ui def _build_ui(self): with dpg.window( label="Config", tag="win_config", pos=(8, 8), width=400, height=200, no_close=True, ): dpg.add_text("Namespace") dpg.add_input_text( tag="namespace", default_value=self.config["output"]["namespace"], width=-1, ) dpg.add_text("Output Dir") dpg.add_input_text( tag="output_dir", default_value=self.config["output"]["output_dir"], width=-1, ) with dpg.group(horizontal=True): dpg.add_button(label="Browse Output Dir", callback=self.cb_browse_output) dpg.add_button(label="Save Config", callback=self.cb_save_config) with dpg.window( label="Files", tag="win_files", pos=(8, 216), width=400, height=500, no_close=True, ): dpg.add_text("Base Dir") with dpg.group(horizontal=True): dpg.add_input_text( tag="files_base_dir", default_value=self.config["files"]["base_dir"], width=-220, ) dpg.add_button( label="Browse##filesbase", callback=self.cb_browse_files_base ) dpg.add_separator() dpg.add_text("Paths") with dpg.child_window(tag="files_scroll", height=-64, border=True): pass self._rebuild_files_list() dpg.add_separator() with dpg.group(horizontal=True): dpg.add_button(label="Add File(s)", callback=self.cb_add_files) dpg.add_button(label="Add Wildcard", callback=self.cb_add_wildcard) with dpg.window( label="Screenshots", tag="win_screenshots", pos=(416, 8), width=400, height=500, no_close=True, ): dpg.add_text("Base Dir") with dpg.group(horizontal=True): dpg.add_input_text( tag="shots_base_dir", default_value=self.config.get("screenshots", {}).get("base_dir", "."), width=-220, ) dpg.add_button( label="Browse##shotsbase", callback=self.cb_browse_shots_base ) dpg.add_separator() dpg.add_text("Paths") with dpg.child_window(tag="shots_scroll", height=-48, border=True): pass self._rebuild_shots_list() dpg.add_separator() dpg.add_button(label="Add Screenshot(s)", callback=self.cb_add_shots) with dpg.window( label="Discussion History", tag="win_discussion", pos=(824, 8), width=400, height=500, no_close=True, ): dpg.add_input_text( tag="discussion_box", default_value="\n---\n".join(self.history), multiline=True, width=-1, height=-64, ) dpg.add_separator() with dpg.group(horizontal=True): dpg.add_button(label="Add Separator", callback=self.cb_add_excerpt) dpg.add_button(label="Clear", callback=self.cb_clear_discussion) dpg.add_button(label="Save", callback=self.cb_save_discussion) with dpg.window( label="Provider", tag="win_provider", pos=(1232, 8), width=420, height=280, no_close=True, ): dpg.add_text("Provider") dpg.add_combo( tag="provider_combo", items=PROVIDERS, default_value=self.current_provider, width=-1, callback=self.cb_provider_changed, ) dpg.add_separator() with dpg.group(horizontal=True): dpg.add_text("Model") dpg.add_button(label="Fetch Models", callback=self.cb_fetch_models) dpg.add_listbox( tag="model_listbox", items=self.available_models, default_value=self.current_model, width=-1, num_items=6, callback=self.cb_model_changed, ) dpg.add_separator() dpg.add_text("Status: idle", tag="ai_status") with dpg.window( label="Message", tag="win_message", pos=(1232, 296), width=420, height=280, no_close=True, ): dpg.add_input_text( tag="ai_input", multiline=True, width=-1, height=-64, ) dpg.add_separator() with dpg.group(horizontal=True): dpg.add_button(label="Gen + Send", callback=self.cb_generate_send) dpg.add_button(label="MD Only", callback=self.cb_md_only) dpg.add_button(label="Reset", callback=self.cb_reset_session) with dpg.window( label="Response", tag="win_response", pos=(1232, 584), width=420, height=300, no_close=True, ): dpg.add_input_text( tag="ai_response", multiline=True, readonly=True, width=-1, height=-1, ) with dpg.window( label="Tool Calls", tag="win_tool_log", pos=(1232, 892), width=420, height=300, no_close=True, ): with dpg.group(horizontal=True): dpg.add_text("Tool call history") dpg.add_button(label="Clear", callback=self.cb_clear_tool_log) dpg.add_separator() with dpg.child_window(tag="tool_log_scroll", height=-1, border=False): pass def run(self): dpg.create_context() dpg.configure_app(docking=True, docking_space=True) dpg.create_viewport(title="manual slop", width=1680, height=1200) dpg.setup_dearpygui() dpg.show_viewport() dpg.maximize_viewport() self._build_ui() self._fetch_models(self.current_provider) while dpg.is_dearpygui_running(): # Show any pending confirmation dialog on the main thread with self._pending_dialog_lock: dialog = self._pending_dialog self._pending_dialog = None if dialog is not None: dialog.show() dpg.render_dearpygui_frame() dpg.destroy_context() def main(): app = App() app.run() if __name__ == "__main__": main()