refactor(types): add strict type hints to gui_2.py and gui_legacy.py

Automated pipeline applied 217 type annotations across both UI modules:
- 158 auto -> None return types via AST single-pass
- 25 manual signatures (callbacks, factory methods, complex returns)
- 34 variable type annotations (constants, color tuples, config)

Zero untyped functions/variables remain in either file.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 11:01:01 -05:00
parent a2a1447f58
commit c816f65665
3 changed files with 570 additions and 217 deletions

170
gui_2.py
View File

@@ -38,9 +38,9 @@ from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel
from imgui_bundle import imgui, hello_imgui, immapp
CONFIG_PATH = Path("config.toml")
PROVIDERS = ["gemini", "anthropic", "gemini_cli", "deepseek"]
COMMS_CLAMP_CHARS = 300
CONFIG_PATH: Path = Path("config.toml")
PROVIDERS: list[str] = ["gemini", "anthropic", "gemini_cli", "deepseek"]
COMMS_CLAMP_CHARS: int = 300
def load_config() -> dict[str, Any]:
with open(CONFIG_PATH, "rb") as f:
@@ -59,25 +59,25 @@ def hide_tk_root() -> Tk:
def vec4(r: float, g: float, b: float, a: float = 1.0) -> imgui.ImVec4: return imgui.ImVec4(r/255, g/255, b/255, a)
C_OUT = vec4(100, 200, 255)
C_IN = vec4(140, 255, 160)
C_REQ = vec4(255, 220, 100)
C_RES = vec4(180, 255, 180)
C_TC = vec4(255, 180, 80)
C_TR = vec4(180, 220, 255)
C_TRS = vec4(200, 180, 255)
C_LBL = vec4(180, 180, 180)
C_VAL = vec4(220, 220, 220)
C_KEY = vec4(140, 200, 255)
C_NUM = vec4(180, 255, 180)
C_SUB = vec4(220, 200, 120)
C_OUT: tuple[float, ...] = vec4(100, 200, 255)
C_IN: tuple[float, ...] = vec4(140, 255, 160)
C_REQ: tuple[float, ...] = vec4(255, 220, 100)
C_RES: tuple[float, ...] = vec4(180, 255, 180)
C_TC: tuple[float, ...] = vec4(255, 180, 80)
C_TR: tuple[float, ...] = vec4(180, 220, 255)
C_TRS: tuple[float, ...] = vec4(200, 180, 255)
C_LBL: tuple[float, ...] = vec4(180, 180, 180)
C_VAL: tuple[float, ...] = vec4(220, 220, 220)
C_KEY: tuple[float, ...] = vec4(140, 200, 255)
C_NUM: tuple[float, ...] = vec4(180, 255, 180)
C_SUB: tuple[float, ...] = vec4(220, 200, 120)
DIR_COLORS = {"OUT": C_OUT, "IN": C_IN}
KIND_COLORS = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS}
HEAVY_KEYS = {"message", "text", "script", "output", "content"}
DIR_COLORS: dict[str, tuple[float, ...]] = {"OUT": C_OUT, "IN": C_IN}
KIND_COLORS: dict[str, tuple[float, ...]] = {"request": C_REQ, "response": C_RES, "tool_call": C_TC, "tool_result": C_TR, "tool_result_send": C_TRS}
HEAVY_KEYS: set[str] = {"message", "text", "script", "output", "content"}
DISC_ROLES = ["User", "AI", "Vendor API", "System"]
AGENT_TOOL_NAMES = ["run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"]
DISC_ROLES: list[str] = ["User", "AI", "Vendor API", "System"]
AGENT_TOOL_NAMES: list[str] = ["run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"]
def truncate_entries(entries: list[dict[str, Any]], max_pairs: int) -> list[dict[str, Any]]:
if max_pairs <= 0:
@@ -576,7 +576,7 @@ class App:
self._create_discussion(nm)
self.ui_disc_new_name_input = ""
def _load_active_project(self):
def _load_active_project(self) -> None:
if self.active_project_path and Path(self.active_project_path).exists():
try:
self.project = project_manager.load_project(self.active_project_path)
@@ -599,7 +599,7 @@ class App:
if fallback_path not in self.project_paths:
self.project_paths.append(fallback_path)
def _switch_project(self, path: str):
def _switch_project(self, path: str) -> None:
if not Path(path).exists():
self.ai_status = f"project file not found: {path}"
return
@@ -616,7 +616,7 @@ class App:
ai_client.reset_session()
self.ai_status = f"switched to: {Path(path).stem}"
def _refresh_from_project(self):
def _refresh_from_project(self) -> None:
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
@@ -668,7 +668,7 @@ class App:
if track_history:
self.disc_entries = _parse_history_entries(track_history, self.disc_roles)
def _cb_load_track(self, track_id: str):
def _cb_load_track(self, track_id: str) -> None:
state = project_manager.load_track_state(track_id, self.ui_files_base_dir)
if state:
try:
@@ -699,7 +699,7 @@ class App:
self.ai_status = f"Load track error: {e}"
print(f"Error loading track {track_id}: {e}")
def _save_active_project(self):
def _save_active_project(self) -> None:
if self.active_project_path:
try:
project_manager.save_project(self.project, self.active_project_path)
@@ -715,7 +715,7 @@ class App:
self._discussion_names_dirty = False
return self._discussion_names_cache
def _switch_discussion(self, name: str):
def _switch_discussion(self, name: str) -> None:
self._flush_disc_entries_to_project()
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
@@ -729,7 +729,7 @@ class App:
self.disc_entries = _parse_history_entries(disc_data.get("history", []), self.disc_roles)
self.ai_status = f"discussion: {name}"
def _flush_disc_entries_to_project(self):
def _flush_disc_entries_to_project(self) -> None:
history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries]
if self.active_track:
project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir)
@@ -740,7 +740,7 @@ class App:
disc_data["history"] = history_strings
disc_data["last_updated"] = project_manager.now_ts()
def _create_discussion(self, name: str):
def _create_discussion(self, name: str) -> None:
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
if name in discussions:
@@ -750,7 +750,7 @@ class App:
self._discussion_names_dirty = True
self._switch_discussion(name)
def _rename_discussion(self, old_name: str, new_name: str):
def _rename_discussion(self, old_name: str, new_name: str) -> None:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if old_name not in discussions:
@@ -764,7 +764,7 @@ class App:
self.active_discussion = new_name
disc_sec["active"] = new_name
def _delete_discussion(self, name: str):
def _delete_discussion(self, name: str) -> None:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if len(discussions) <= 1:
@@ -779,7 +779,7 @@ class App:
self._switch_discussion(remaining[0])
# ---------------------------------------------------------------- logic
def _on_comms_entry(self, entry: dict):
def _on_comms_entry(self, entry: dict) -> None:
session_logger.log_comms(entry)
entry["local_ts"] = time.time()
# If this is a history_add kind, route it to history queue instead
@@ -796,17 +796,17 @@ class App:
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _on_tool_log(self, script: str, result: str):
def _on_tool_log(self, script: str, result: str) -> None:
session_logger.log_tool_call(script, result, None)
with self._pending_tool_calls_lock:
self._pending_tool_calls.append((script, result, time.time()))
def _on_api_event(self, *args, **kwargs):
def _on_api_event(self, *args, **kwargs) -> None:
payload = kwargs.get("payload", {})
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
def _on_performance_alert(self, message: str):
def _on_performance_alert(self, message: str) -> None:
"""Called by PerformanceMonitor when a threshold is exceeded."""
alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load."
# Inject into history as a 'System' message
@@ -817,7 +817,7 @@ class App:
"ts": project_manager.now_ts()
})
def _process_pending_gui_tasks(self):
def _process_pending_gui_tasks(self) -> None:
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
@@ -932,7 +932,7 @@ class App:
except Exception as e:
print(f"Error executing GUI task: {e}")
def _handle_approve_script(self):
def _handle_approve_script(self) -> None:
"""Logic for approving a pending script via API hooks."""
print("[DEBUG] _handle_approve_script called")
with self._pending_dialog_lock:
@@ -946,7 +946,7 @@ class App:
else:
print("[DEBUG] No pending dialog to approve")
def _handle_reject_script(self):
def _handle_reject_script(self) -> None:
"""Logic for rejecting a pending script via API hooks."""
print("[DEBUG] _handle_reject_script called")
with self._pending_dialog_lock:
@@ -960,7 +960,7 @@ class App:
else:
print("[DEBUG] No pending dialog to reject")
def _handle_mma_respond(self, approved: bool, payload: str = None, abort: bool = False, prompt: str = None, context_md: str = None):
def _handle_mma_respond(self, approved: bool, payload: str = None, abort: bool = False, prompt: str = None, context_md: str = None) -> None:
if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
if dlg:
@@ -985,7 +985,7 @@ class App:
dlg._condition.notify_all()
self._pending_mma_spawn = None
def _handle_approve_ask(self):
def _handle_approve_ask(self) -> None:
"""Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
@@ -1003,7 +1003,7 @@ class App:
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reject_ask(self):
def _handle_reject_ask(self) -> None:
"""Responds with rejection for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
@@ -1021,7 +1021,7 @@ class App:
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reset_session(self):
def _handle_reset_session(self) -> None:
"""Logic for resetting the AI session."""
ai_client.reset_session()
ai_client.clear_comms_log()
@@ -1037,7 +1037,7 @@ class App:
self.ai_response = ""
self.ui_ai_input = ""
def _handle_md_only(self):
def _handle_md_only(self) -> None:
"""Logic for the 'MD Only' action."""
try:
md, path, *_ = self._do_generate()
@@ -1049,7 +1049,7 @@ class App:
except Exception as e:
self.ai_status = f"error: {e}"
def _handle_generate_send(self):
def _handle_generate_send(self) -> None:
"""Logic for the 'Gen + Send' action."""
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
@@ -1076,13 +1076,13 @@ class App:
self._loop
)
def _run_event_loop(self):
def _run_event_loop(self) -> None:
"""Runs the internal asyncio event loop."""
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._process_event_queue())
self._loop.run_forever()
def shutdown(self):
def shutdown(self) -> None:
"""Cleanly shuts down the app's background tasks."""
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
@@ -1094,7 +1094,7 @@ class App:
if self.models_thread and self.models_thread.is_alive():
self.models_thread.join(timeout=1.0)
async def _process_event_queue(self):
async def _process_event_queue(self) -> None:
"""Listens for and processes events from the AsyncEventQueue."""
while True:
event_name, payload = await self.event_queue.get()
@@ -1115,7 +1115,7 @@ class App:
"payload": payload
})
def _handle_request_event(self, event: events.UserRequestEvent):
def _handle_request_event(self, event: events.UserRequestEvent) -> None:
"""Processes a UserRequestEvent by calling the AI client."""
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
@@ -1147,14 +1147,14 @@ class App:
self._loop
)
def _test_callback_func_write_to_file(self, data: str):
def _test_callback_func_write_to_file(self, data: str) -> None:
"""A dummy function that a custom_callback would execute for testing."""
# Note: This file path is relative to where the test is run.
# This is for testing purposes only.
with open("temp_callback_output.txt", "w") as f:
f.write(data)
def _recalculate_session_usage(self):
def _recalculate_session_usage(self) -> None:
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0}
for entry in ai_client.get_comms_log():
if entry.get("kind") == "response" and "usage" in entry.get("payload", {}):
@@ -1164,7 +1164,7 @@ class App:
usage[k] += u.get(k, 0) or 0
self.session_usage = usage
def _refresh_api_metrics(self, payload: dict, md_content: str | None = None):
def _refresh_api_metrics(self, payload: dict, md_content: str | None = None) -> None:
if "latency" in payload:
self.session_usage["last_latency"] = payload["latency"]
self._recalculate_session_usage()
@@ -1184,7 +1184,7 @@ class App:
size_bytes = cache_stats.get("total_size_bytes", 0)
self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)"
def cb_load_prior_log(self):
def cb_load_prior_log(self) -> None:
root = hide_tk_root()
path = filedialog.askopenfilename(
title="Load Session Log",
@@ -1249,7 +1249,7 @@ class App:
self.ai_status = "powershell done, awaiting AI..."
return output
def resolve_pending_action(self, action_id: str, approved: bool):
def resolve_pending_action(self, action_id: str, approved: bool) -> bool:
"""Resolves a pending PowerShell script confirmation by its ID.
Args:
@@ -1276,7 +1276,7 @@ class App:
return True
return False
def _append_tool_log(self, script: str, result: str):
def _append_tool_log(self, script: str, result: str) -> None:
self._tool_log.append((script, result, time.time()))
self.ui_last_script_text = script
self.ui_last_script_output = result
@@ -1285,7 +1285,7 @@ class App:
if self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
def _flush_to_project(self):
def _flush_to_project(self) -> None:
proj = self.project
proj.setdefault("output", {})["output_dir"] = self.ui_output_dir
proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir
@@ -1320,7 +1320,7 @@ class App:
else:
mma_sec["active_track"] = None
def _flush_to_config(self):
def _flush_to_config(self) -> None:
self.config["ai"] = {
"provider": self.current_provider,
"model": self.current_model,
@@ -1352,7 +1352,7 @@ class App:
discussion_text = aggregate.build_discussion_text(history)
return full_md, path, file_items, stable_md, discussion_text
def _fetch_models(self, provider: str):
def _fetch_models(self, provider: str) -> None:
self.ai_status = "fetching models..."
def do_fetch():
@@ -1369,13 +1369,13 @@ class App:
self.models_thread.start()
# ---------------------------------------------------------------- helpers
def _render_text_viewer(self, label: str, content: str):
def _render_text_viewer(self, label: str, content: str) -> None:
if imgui.button("[+]##" + str(id(content))):
self.show_text_viewer = True
self.text_viewer_title = label
self.text_viewer_content = content
def _render_heavy_text(self, label: str, content: str):
def _render_heavy_text(self, label: str, content: str) -> None:
imgui.text_colored(C_LBL, f"{label}:")
imgui.same_line()
if imgui.button("[+]##" + label):
@@ -1400,7 +1400,7 @@ class App:
imgui.text(content if content else "(empty)")
# ---------------------------------------------------------------- gui
def _show_menus(self):
def _show_menus(self) -> None:
if imgui.begin_menu("Windows"):
for w in self.show_windows.keys():
_, self.show_windows[w] = imgui.menu_item(w, "", self.show_windows[w])
@@ -1429,7 +1429,7 @@ class App:
self.ai_status = f"error: {e}"
imgui.end_menu()
def _gui_func(self):
def _gui_func(self) -> None:
try:
self.perf_monitor.start_frame()
# Process GUI task queue
@@ -1817,7 +1817,7 @@ class App:
import traceback
traceback.print_exc()
def _render_projects_panel(self):
def _render_projects_panel(self) -> None:
proj_name = self.project.get("project", {}).get("name", Path(self.active_project_path).stem)
imgui.text_colored(C_IN, f"Active: {proj_name}")
imgui.separator()
@@ -1910,7 +1910,7 @@ class App:
if imgui.button('Plan Epic (Tier 1)', imgui.ImVec2(-1, 0)):
self._cb_plan_epic()
def _cb_plan_epic(self):
def _cb_plan_epic(self) -> None:
def _bg_task():
try:
self.ai_status = "Planning Epic (Tier 1)..."
@@ -1937,14 +1937,14 @@ class App:
print(f"ERROR in _cb_plan_epic background task: {e}")
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_accept_tracks(self):
def _cb_accept_tracks(self) -> None:
def _bg_task():
for track_data in self.proposed_tracks:
self._start_track_logic(track_data)
self.ai_status = "Tracks accepted and execution started."
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_start_track(self, user_data=None):
def _cb_start_track(self, user_data: Any = None) -> None:
idx = 0
if isinstance(user_data, int):
idx = user_data
@@ -1956,7 +1956,7 @@ class App:
threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start()
self.ai_status = f"Track '{title}' started."
def _start_track_logic(self, track_data):
def _start_track_logic(self, track_data: dict[str, Any]) -> None:
try:
goal = track_data.get("goal", "")
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
@@ -2019,7 +2019,7 @@ class App:
self.ai_status = f"Track start error: {e}"
print(f"ERROR in _start_track_logic: {e}")
def _render_track_proposal_modal(self):
def _render_track_proposal_modal(self) -> None:
if self._show_track_proposal_modal:
imgui.open_popup("Track Proposal")
if imgui.begin_popup_modal("Track Proposal", True, imgui.WindowFlags_.always_auto_resize)[0]:
@@ -2044,7 +2044,7 @@ class App:
imgui.close_current_popup()
imgui.end_popup()
def _render_log_management(self):
def _render_log_management(self) -> None:
exp, self.show_windows["Log Management"] = imgui.begin("Log Management", self.show_windows["Log Management"])
if not exp:
imgui.end()
@@ -2103,7 +2103,7 @@ class App:
imgui.end_table()
imgui.end()
def _render_files_panel(self):
def _render_files_panel(self) -> None:
imgui.text("Base Dir")
ch, self.ui_files_base_dir = imgui.input_text("##f_base", self.ui_files_base_dir)
imgui.same_line()
@@ -2135,7 +2135,7 @@ class App:
r.destroy()
if d: self.files.append(str(Path(d) / "**" / "*"))
def _render_screenshots_panel(self):
def _render_screenshots_panel(self) -> None:
imgui.text("Base Dir")
ch, self.ui_shots_base_dir = imgui.input_text("##s_base", self.ui_shots_base_dir)
imgui.same_line()
@@ -2164,7 +2164,7 @@ class App:
for p in paths:
if p not in self.screenshots: self.screenshots.append(p)
def _render_discussion_panel(self):
def _render_discussion_panel(self) -> None:
# THINKING indicator
is_thinking = self.ai_status in ["sending..."]
if is_thinking:
@@ -2354,7 +2354,7 @@ class App:
self._scroll_disc_to_bottom = False
imgui.end_child()
def _render_provider_panel(self):
def _render_provider_panel(self) -> None:
imgui.text("Provider")
if imgui.begin_combo("##prov", self.current_provider):
for p in PROVIDERS:
@@ -2413,7 +2413,7 @@ class App:
if self._gemini_cache_text:
imgui.text_colored(C_SUB, self._gemini_cache_text)
def _render_message_panel(self):
def _render_message_panel(self) -> None:
# LIVE indicator
is_live = self.ai_status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
if is_live:
@@ -2446,7 +2446,7 @@ class App:
if self.ui_ai_input:
self.disc_entries.append({"role": "User", "content": self.ui_ai_input, "collapsed": False, "ts": project_manager.now_ts()})
def _render_response_panel(self):
def _render_response_panel(self) -> None:
if self._trigger_blink:
self._trigger_blink = False
self._is_blinking = True
@@ -2482,7 +2482,7 @@ class App:
if is_blinking:
imgui.pop_style_color(2)
def _cb_ticket_retry(self, ticket_id):
def _cb_ticket_retry(self, ticket_id: str) -> None:
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'todo'
@@ -2492,7 +2492,7 @@ class App:
self._loop
)
def _cb_ticket_skip(self, ticket_id):
def _cb_ticket_skip(self, ticket_id: str) -> None:
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'skipped'
@@ -2502,7 +2502,7 @@ class App:
self._loop
)
def _render_mma_dashboard(self):
def _render_mma_dashboard(self) -> None:
# 1. Track Browser
imgui.text("Track Browser")
if imgui.begin_table("mma_tracks_table", 4, imgui.TableFlags_.borders | imgui.TableFlags_.row_bg | imgui.TableFlags_.resizable):
@@ -2595,7 +2595,7 @@ class App:
else:
imgui.text_disabled("No active MMA track.")
def _render_ticket_dag_node(self, ticket, tickets_by_id, children_map, rendered):
def _render_ticket_dag_node(self, ticket: Ticket, tickets_by_id: dict[str, Ticket], children_map: dict[str, list[str]], rendered: set[str]) -> None:
tid = ticket.get('id', '??')
target = ticket.get('target_file', 'general')
status = ticket.get('status', 'pending').upper()
@@ -2656,7 +2656,7 @@ class App:
imgui.text_disabled(" (shown above)")
imgui.tree_pop()
def _render_tool_calls_panel(self):
def _render_tool_calls_panel(self) -> None:
imgui.text("Tool call history")
imgui.same_line()
if imgui.button("Clear##tc"):
@@ -2728,7 +2728,7 @@ class App:
self._scroll_tool_calls_to_bottom = False
imgui.end_child()
def _render_comms_history_panel(self):
def _render_comms_history_panel(self) -> None:
imgui.text_colored(vec4(200, 220, 160), f"Status: {self.ai_status}")
imgui.same_line()
if imgui.button("Clear##comms"):
@@ -2855,14 +2855,14 @@ class App:
if self.is_viewing_prior_session:
imgui.pop_style_color()
def _render_system_prompts_panel(self):
def _render_system_prompts_panel(self) -> None:
imgui.text("Global System Prompt (all projects)")
ch, self.ui_global_system_prompt = imgui.input_text_multiline("##gsp", self.ui_global_system_prompt, imgui.ImVec2(-1, 100))
imgui.separator()
imgui.text("Project System Prompt")
ch, self.ui_project_system_prompt = imgui.input_text_multiline("##psp", self.ui_project_system_prompt, imgui.ImVec2(-1, 100))
def _render_theme_panel(self):
def _render_theme_panel(self) -> None:
exp, self.show_windows["Theme"] = imgui.begin("Theme", self.show_windows["Theme"])
if exp:
imgui.text("Palette")
@@ -2901,15 +2901,15 @@ class App:
if ch: theme.set_scale(scale)
imgui.end()
def _load_fonts(self):
def _load_fonts(self) -> None:
font_path, font_size = theme.get_font_loading_params()
if font_path and Path(font_path).exists():
hello_imgui.load_font(font_path, font_size)
def _post_init(self):
def _post_init(self) -> None:
theme.apply_current()
def run(self):
def run(self) -> None:
"""Initializes the ImGui runner and starts the main application loop."""
if "--headless" in sys.argv:
print("Headless mode active")
@@ -2949,7 +2949,7 @@ class App:
save_config(self.config)
session_logger.close_session()
def main():
def main() -> None:
app = App()
app.run()

View File

@@ -30,17 +30,17 @@ import theme
import mcp_client
from performance_monitor import PerformanceMonitor
CONFIG_PATH = Path("config.toml")
PROVIDERS = ["gemini", "anthropic"]
CONFIG_PATH: Path = Path("config.toml")
PROVIDERS: list[str] = ["gemini", "anthropic"]
# Max chars shown inline for a heavy comms field before clamping to a scrollable box
COMMS_CLAMP_CHARS = 300
COMMS_CLAMP_CHARS: int = 300
def load_config() -> dict:
with open(CONFIG_PATH, "rb") as f:
return tomllib.load(f)
def save_config(config: dict):
def save_config(config: dict) -> None:
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
@@ -75,13 +75,13 @@ def truncate_entries(entries: list[dict], max_pairs: int) -> list[dict]:
return entries[-target_count:]
# ------------------------------------------------------------------ comms rendering helpers
# Direction -> colour
_DIR_COLORS = {
_DIR_COLORS: dict[str, tuple[int, int, int]] = {
"OUT": (100, 200, 255), # blue-ish
"IN": (140, 255, 160), # green-ish
}
# Kind -> colour
_KIND_COLORS = {
_KIND_COLORS: dict[str, tuple[int, int, int]] = {
"request": (255, 220, 100),
"response": (180, 255, 180),
"tool_call": (255, 180, 80),
@@ -89,16 +89,16 @@ _KIND_COLORS = {
"tool_result_send": (200, 180, 255),
}
_HEAVY_KEYS = {"message", "text", "script", "output", "content"}
_HEAVY_KEYS: set[str] = {"message", "text", "script", "output", "content"}
# Label colours used in rich rendering
_LABEL_COLOR = (180, 180, 180)
_VALUE_COLOR = (220, 220, 220)
_KEY_COLOR = (140, 200, 255) # dict key / call index
_NUM_COLOR = (180, 255, 180) # numbers / token counts
_SUBHDR_COLOR = (220, 200, 120) # sub-section header
_LABEL_COLOR: tuple[int, int, int] = (180, 180, 180)
_VALUE_COLOR: tuple[int, int, int] = (220, 220, 220)
_KEY_COLOR: tuple[int, int, int] = (140, 200, 255) # dict key / call index
_NUM_COLOR: tuple[int, int, int] = (180, 255, 180) # numbers / token counts
_SUBHDR_COLOR: tuple[int, int, int] = (220, 200, 120) # sub-section header
def _show_text_viewer(title: str, text: str):
def _show_text_viewer(title: str, text: str) -> None:
if dpg.does_item_exist("win_text_viewer"):
wrap = dpg.get_value("project_word_wrap") if dpg.does_item_exist("project_word_wrap") else False
dpg.configure_item("win_text_viewer", label=f"Text Viewer - {title}", show=True)
@@ -110,7 +110,7 @@ def _show_text_viewer(title: str, text: str):
dpg.configure_item("text_viewer_wrap_container", show=wrap)
dpg.focus_item("win_text_viewer")
def _add_text_field(parent: str, label: str, value: str):
def _add_text_field(parent: str, label: str, value: str) -> None:
"""Render a labelled text value; long values get a scrollable box."""
wrap = dpg.get_value("project_word_wrap") if dpg.does_item_exist("project_word_wrap") else False
with dpg.group(horizontal=False, parent=parent):
@@ -134,13 +134,13 @@ def _add_text_field(parent: str, label: str, value: str):
# Short selectable text
dpg.add_input_text(default_value=value if value else "(empty)", readonly=True, width=-1)
def _add_kv_row(parent: str, key: str, val, val_color=None):
def _add_kv_row(parent: str, key: str, val: Any, val_color: tuple[int, int, int] | None = None) -> None:
"""Single key: value row, horizontally laid out."""
with dpg.group(horizontal=True, parent=parent):
dpg.add_text(f"{key}:", color=_LABEL_COLOR)
dpg.add_input_text(default_value=str(val), readonly=True, width=-1)
def _render_usage(parent: str, usage: dict):
def _render_usage(parent: str, usage: dict) -> None:
"""Render Anthropic usage dict as a compact token table."""
if not usage:
return
@@ -160,7 +160,7 @@ def _render_usage(parent: str, usage: dict):
if key not in shown:
_add_kv_row(parent, f" {key}", val, _NUM_COLOR)
def _render_tool_calls_list(parent: str, tool_calls: list):
def _render_tool_calls_list(parent: str, tool_calls: list) -> None:
"""Render a list of tool_call dicts inline."""
if not tool_calls:
dpg.add_text(" (none)", color=_VALUE_COLOR, parent=parent)
@@ -177,10 +177,10 @@ def _render_tool_calls_list(parent: str, tool_calls: list):
_add_text_field(parent, " args", str(args))
# ---- kind-specific renderers ------------------------------------------------
def _render_payload_request(parent: str, payload: dict):
def _render_payload_request(parent: str, payload: dict) -> None:
_add_text_field(parent, "message", payload.get("message", ""))
def _render_payload_response(parent: str, payload: dict):
def _render_payload_response(parent: str, payload: dict) -> None:
_add_kv_row(parent, "round", payload.get("round", ""))
_add_kv_row(parent, "stop_reason", payload.get("stop_reason", ""), (255, 200, 120))
text = payload.get("text", "")
@@ -192,7 +192,7 @@ def _render_payload_response(parent: str, payload: dict):
if usage:
_render_usage(parent, usage)
def _render_payload_tool_call(parent: str, payload: dict):
def _render_payload_tool_call(parent: str, payload: dict) -> None:
_add_kv_row(parent, "name", payload.get("name", ""))
if "id" in payload:
_add_kv_row(parent, "id", payload["id"])
@@ -207,19 +207,19 @@ def _render_payload_tool_call(parent: str, payload: dict):
else:
_add_text_field(parent, "args", str(args))
def _render_payload_tool_result(parent: str, payload: dict):
def _render_payload_tool_result(parent: str, payload: dict) -> None:
_add_kv_row(parent, "name", payload.get("name", ""))
if "id" in payload:
_add_kv_row(parent, "id", payload["id"])
_add_text_field(parent, "output", payload.get("output", ""))
def _render_payload_tool_result_send(parent: str, payload: dict):
def _render_payload_tool_result_send(parent: str, payload: dict) -> None:
for i, r in enumerate(payload.get("results", [])):
dpg.add_text(f"result[{i}]", color=_KEY_COLOR, parent=parent)
_add_kv_row(parent, " tool_use_id", r.get("tool_use_id", ""))
_add_text_field(parent, " content", str(r.get("content", "")))
def _render_payload_generic(parent: str, payload: dict):
def _render_payload_generic(parent: str, payload: dict) -> None:
"""Fallback: render any unknown payload kind as labelled fields."""
import json
for key, val in payload.items():
@@ -232,7 +232,7 @@ def _render_payload_generic(parent: str, payload: dict):
else:
_add_kv_row(parent, key, val_str)
_KIND_RENDERERS = {
_KIND_RENDERERS: dict[str, Callable] = {
"request": _render_payload_request,
"response": _render_payload_response,
"tool_call": _render_payload_tool_call,
@@ -240,7 +240,7 @@ _KIND_RENDERERS = {
"tool_result_send": _render_payload_tool_result_send,
}
def _render_comms_entry(parent: str, entry: dict, idx: int):
def _render_comms_entry(parent: str, entry: dict, idx: int) -> None:
direction = entry["direction"]
kind = entry["kind"]
ts = entry["ts"]
@@ -269,9 +269,9 @@ class ConfirmDialog:
Main render loop detects _pending_dialog and calls show() on the next frame.
User clicks Approve or Reject, which sets the event and unblocks the thread.
"""
_next_id = 0
_next_id: int = 0
def __init__(self, script: str, base_dir: str):
def __init__(self, script: str, base_dir: str) -> None:
ConfirmDialog._next_id += 1
self._uid = ConfirmDialog._next_id
self._tag = f"confirm_dlg_{self._uid}"
@@ -281,7 +281,7 @@ class ConfirmDialog:
self._event = threading.Event()
self._approved = False
def show(self):
def show(self) -> None:
"""Called from main thread only. Wrapped in try/except to prevent thread lockups."""
try:
w, h = 700, 480
@@ -326,7 +326,7 @@ class ConfirmDialog:
self._approved = False
self._event.set()
def _cb_approve(self):
def _cb_approve(self) -> None:
try:
self._script = dpg.get_value(f"{self._tag}_script")
except Exception:
@@ -338,7 +338,7 @@ class ConfirmDialog:
except Exception:
pass
def _cb_reject(self):
def _cb_reject(self) -> None:
self._approved = False
self._event.set()
try:
@@ -351,7 +351,7 @@ class ConfirmDialog:
self._event.wait()
return self._approved, self._script
DISC_ROLES = ["User", "AI", "Vendor API", "System"]
DISC_ROLES: list[str] = ["User", "AI", "Vendor API", "System"]
def _parse_history_entries(history: list[str], roles: list[str] | None = None) -> list[dict]:
"""
@@ -366,7 +366,7 @@ def _parse_history_entries(history: list[str], roles: list[str] | None = None) -
return entries
class App:
def __init__(self):
def __init__(self) -> None:
self.config = load_config()
# Controls whether API hooks are enabled, based on CLI arg or env var
self.test_hooks_enabled: bool = (
@@ -477,7 +477,7 @@ class App:
self._recalculate_session_usage()
# ---------------------------------------------------------------- project loading
def _load_active_project(self):
def _load_active_project(self) -> None:
"""
Load the active project .toml. If no project paths configured or
active path is missing, attempt migration from legacy config.toml.
@@ -507,7 +507,7 @@ class App:
if fallback_path not in self.project_paths:
self.project_paths.append(fallback_path)
def _switch_project(self, path: str):
def _switch_project(self, path: str) -> None:
"""Switch to a different project .toml file."""
if not Path(path).exists():
self._update_status(f"project file not found: {path}")
@@ -531,7 +531,7 @@ class App:
self._update_response("")
self._update_status(f"switched to: {Path(path).stem}")
def _refresh_from_project(self):
def _refresh_from_project(self) -> None:
"""Reload all GUI state from self.project after a project switch or discussion switch."""
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
@@ -549,7 +549,7 @@ class App:
self._rebuild_disc_roles_list()
self._rebuild_discussion_selector()
def _refresh_project_widgets(self):
def _refresh_project_widgets(self) -> None:
"""Push project-level values into the GUI widgets."""
proj = self.project
if dpg.does_item_exist("output_dir"):
@@ -578,7 +578,7 @@ class App:
dpg.set_value(tag, agent_tools.get(t_name, True))
self.cb_word_wrap_toggled(app_data=proj.get("project", {}).get("word_wrap", True))
def _save_active_project(self):
def _save_active_project(self) -> None:
"""Write self.project to the active project .toml file."""
if self.active_project_path:
try:
@@ -593,7 +593,7 @@ class App:
discussions = disc_sec.get("discussions", {})
return sorted(discussions.keys())
def _switch_discussion(self, name: str):
def _switch_discussion(self, name: str) -> None:
"""Save current discussion entries, then switch to a different one."""
# Save current entries into project
self._flush_disc_entries_to_project()
@@ -611,7 +611,7 @@ class App:
self._rebuild_discussion_selector()
self._update_status(f"discussion: {name}")
def _flush_disc_entries_to_project(self):
def _flush_disc_entries_to_project(self) -> None:
"""Serialize current disc_entries back into the active discussion in self.project."""
# Pull latest content from widgets
for i, entry in enumerate(self.disc_entries):
@@ -625,7 +625,7 @@ class App:
disc_data["history"] = history_strings
disc_data["last_updated"] = project_manager.now_ts()
def _create_discussion(self, name: str):
def _create_discussion(self, name: str) -> None:
"""Create a new empty discussion in the active project."""
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
@@ -635,7 +635,7 @@ class App:
discussions[name] = project_manager.default_discussion()
self._switch_discussion(name)
def _rename_discussion(self, old_name: str, new_name: str):
def _rename_discussion(self, old_name: str, new_name: str) -> None:
"""Rename a discussion."""
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
@@ -650,7 +650,7 @@ class App:
disc_sec["active"] = new_name
self._rebuild_discussion_selector()
def _delete_discussion(self, name: str):
def _delete_discussion(self, name: str) -> None:
"""Delete a discussion. Cannot delete the last one."""
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
@@ -667,7 +667,7 @@ class App:
else:
self._rebuild_discussion_selector()
def _update_discussion_git_commit(self):
def _update_discussion_git_commit(self) -> None:
"""Update the git commit hash on the active discussion."""
git_dir = self.project.get("project", {}).get("git_dir", "")
if not git_dir:
@@ -687,7 +687,7 @@ class App:
self._rebuild_discussion_selector()
self._update_status(f"commit: {commit[:12]}")
def _queue_history_add(self, role: str, content: str):
def _queue_history_add(self, role: str, content: str) -> None:
"""Safely queue a new history entry from a background thread."""
with self._pending_history_adds_lock:
self._pending_history_adds.append({
@@ -698,17 +698,17 @@ class App:
})
# ---------------------------------------------------------------- comms log
def _on_comms_entry(self, entry: dict):
def _on_comms_entry(self, entry: dict) -> None:
"""Called from background thread; queue for main thread."""
session_logger.log_comms(entry)
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _on_tool_log(self, script: str, result: str):
def _on_tool_log(self, script: str, result: str) -> None:
"""Called from background thread when a tool call completes."""
session_logger.log_tool_call(script, result, None)
def _on_performance_alert(self, message: str):
def _on_performance_alert(self, message: str) -> None:
"""Called by PerformanceMonitor when a threshold is exceeded."""
alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load."
# Inject into history as a 'System' message or similar
@@ -719,7 +719,7 @@ class App:
"ts": project_manager.now_ts()
})
def _recalculate_session_usage(self):
def _recalculate_session_usage(self) -> None:
"""Aggregates usage across the session from comms log."""
usage = {
"input_tokens": 0,
@@ -734,7 +734,7 @@ class App:
usage[k] += u.get(k, 0) or 0
self.session_usage = usage
def _flush_pending_comms(self):
def _flush_pending_comms(self) -> None:
"""Called every frame from the main render loop."""
with self._pending_comms_lock:
entries = self._pending_comms[:]
@@ -746,20 +746,20 @@ class App:
self._recalculate_session_usage()
self._update_token_usage()
def _update_token_usage(self):
def _update_token_usage(self) -> None:
if not dpg.does_item_exist("ai_token_usage"):
return
usage = self.session_usage
total = usage["input_tokens"] + usage["output_tokens"]
dpg.set_value("ai_token_usage", f"Tokens: {total} (In: {usage['input_tokens']} Out: {usage['output_tokens']})")
def _on_api_event(self, *args, **kwargs):
def _on_api_event(self, *args, **kwargs) -> None:
"""Callback for ai_client events. Queues a telemetry refresh on the main thread."""
payload = kwargs.get("payload", {})
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
def _refresh_api_metrics(self, payload: dict = None):
def _refresh_api_metrics(self, payload: dict = None) -> None:
"""Updates the token budget and cache stats visualizers."""
payload = payload or {}
self._last_bleed_update_time = time.time()
@@ -787,7 +787,7 @@ class App:
# Note: We don't hide it if no stats are in payload,
# to avoid flickering during tool/chunk events that don't include stats.
def _update_performance_diagnostics(self):
def _update_performance_diagnostics(self) -> None:
"""Updates performance diagnostics displays (throttled)."""
now = time.time()
# Update Diagnostics panel (throttled for smoothness)
@@ -814,12 +814,12 @@ class App:
if dpg.does_item_exist("perf_cpu_plot"):
dpg.set_value("perf_cpu_plot", [list(range(100)), self.perf_history["cpu"]])
def _append_comms_entry(self, entry: dict, idx: int):
def _append_comms_entry(self, entry: dict, idx: int) -> None:
if not dpg.does_item_exist("comms_scroll"):
return
_render_comms_entry("comms_scroll", entry, idx)
def _rebuild_comms_log(self):
def _rebuild_comms_log(self) -> None:
"""Full redraw from ai_client.get_comms_log() - used after clear/reset."""
if not dpg.does_item_exist("comms_scroll"):
return
@@ -844,7 +844,7 @@ class App:
self._update_status("powershell done, awaiting AI...")
return output
def _append_tool_log(self, script: str, result: str):
def _append_tool_log(self, script: str, result: str) -> None:
self._last_script = script
self._last_output = result
self._tool_log.append((script, result))
@@ -859,7 +859,7 @@ class App:
dpg.set_value("last_script_output_wrap", result)
self._trigger_script_blink = True
def _rebuild_tool_log(self):
def _rebuild_tool_log(self) -> None:
if not dpg.does_item_exist("tool_log_scroll"):
return
wrap = dpg.get_value("project_word_wrap") if dpg.does_item_exist("project_word_wrap") else False
@@ -893,7 +893,7 @@ class App:
dpg.add_separator()
# ---------------------------------------------------------------- helpers
def _flush_to_project(self):
def _flush_to_project(self) -> None:
"""Pull all widget values into self.project (the active project dict)."""
proj = self.project
# Output
@@ -934,7 +934,7 @@ class App:
if dpg.does_item_exist("auto_add_history"):
disc_sec["auto_add"] = dpg.get_value("auto_add_history")
def _flush_to_config(self):
def _flush_to_config(self) -> None:
"""Pull global settings into self.config (config.toml)."""
self.config["ai"] = {
"provider": self.current_provider,
@@ -959,7 +959,7 @@ class App:
flat = project_manager.flat_config(self.project, self.active_discussion)
return aggregate.run(flat)
def _update_status(self, status: str):
def _update_status(self, status: str) -> None:
self.ai_status = status
if dpg.does_item_exist("ai_status"):
dpg.set_value("ai_status", f"Status: {status}")
@@ -970,14 +970,14 @@ class App:
is_running = status in ["running powershell...", "fetching url...", "searching web..."]
dpg.configure_item("operations_live_indicator", show=is_running)
def _update_response(self, text: str):
def _update_response(self, text: str) -> None:
self.ai_response = text
if dpg.does_item_exist("ai_response"):
dpg.set_value("ai_response", text)
if dpg.does_item_exist("ai_response_wrap"):
dpg.set_value("ai_response_wrap", text)
def _rebuild_files_list(self):
def _rebuild_files_list(self) -> None:
if not dpg.does_item_exist("files_scroll"):
return
dpg.delete_item("files_scroll", children_only=True)
@@ -988,7 +988,7 @@ class App:
)
dpg.add_text(f)
def _rebuild_shots_list(self):
def _rebuild_shots_list(self) -> None:
if not dpg.does_item_exist("shots_scroll"):
return
dpg.delete_item("shots_scroll", children_only=True)
@@ -999,7 +999,7 @@ class App:
)
dpg.add_text(s)
def _rebuild_models_list(self):
def _rebuild_models_list(self) -> None:
if not dpg.does_item_exist("model_listbox"):
return
dpg.configure_item("model_listbox", items=self.available_models)
@@ -1010,7 +1010,7 @@ class App:
dpg.set_value("model_listbox", self.current_model)
ai_client.set_provider(self.current_provider, self.current_model)
def _rebuild_projects_list(self):
def _rebuild_projects_list(self) -> None:
if not dpg.does_item_exist("projects_scroll"):
return
dpg.delete_item("projects_scroll", children_only=True)
@@ -1028,7 +1028,7 @@ class App:
)
dpg.add_text(pp, color=(140, 140, 140))
def _rebuild_discussion_selector(self):
def _rebuild_discussion_selector(self) -> None:
"""Rebuild the discussion selector UI: listbox + metadata for active discussion."""
if not dpg.does_item_exist("disc_selector_group"):
return
@@ -1076,21 +1076,21 @@ class App:
dpg.add_button(label="Rename", tag="btn_disc_rename", callback=self.cb_disc_rename)
dpg.add_button(label="Delete", tag="btn_disc_delete", callback=self.cb_disc_delete)
def _make_remove_file_cb(self, idx: int):
def _make_remove_file_cb(self, idx: int) -> Callable:
def cb():
if idx < len(self.files):
self.files.pop(idx)
self._rebuild_files_list()
return cb
def _make_remove_shot_cb(self, idx: int):
def _make_remove_shot_cb(self, idx: int) -> Callable:
def cb():
if idx < len(self.screenshots):
self.screenshots.pop(idx)
self._rebuild_shots_list()
return cb
def _make_remove_project_cb(self, idx: int):
def _make_remove_project_cb(self, idx: int) -> Callable:
def cb():
if idx < len(self.project_paths):
removed = self.project_paths.pop(idx)
@@ -1099,14 +1099,14 @@ class App:
self._rebuild_projects_list()
return cb
def _make_switch_project_cb(self, path: str):
def _make_switch_project_cb(self, path: str) -> Callable:
def cb():
if path != self.active_project_path:
self._switch_project(path)
self._rebuild_projects_list()
return cb
def _fetch_models(self, provider: str):
def _fetch_models(self, provider: str) -> None:
self._update_status("fetching models...")
def do_fetch():
@@ -1121,7 +1121,7 @@ class App:
self.models_thread.start()
# ---------------------------------------------------------------- callbacks
def cb_word_wrap_toggled(self, sender=None, app_data=None):
def cb_word_wrap_toggled(self, sender: Any = None, app_data: Any = None) -> None:
# This function is now also called by _refresh_project_widgets to set initial state
if app_data is None:
wrap = dpg.get_value("project_word_wrap") if dpg.does_item_exist("project_word_wrap") else False
@@ -1144,28 +1144,28 @@ class App:
self._rebuild_comms_log()
self._rebuild_tool_log()
def cb_browse_output(self):
def cb_browse_output(self) -> None:
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Output Dir")
root.destroy()
if d:
dpg.set_value("output_dir", d)
def cb_save_config(self):
def cb_save_config(self) -> None:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self._update_status("config saved")
def cb_browse_files_base(self):
def cb_browse_files_base(self) -> None:
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Files Base Dir")
root.destroy()
if d:
dpg.set_value("files_base_dir", d)
def cb_add_files(self):
def cb_add_files(self) -> None:
root = hide_tk_root()
paths = filedialog.askopenfilenames(title="Select Files")
root.destroy()
@@ -1174,7 +1174,7 @@ class App:
self.files.append(p)
self._rebuild_files_list()
def cb_add_wildcard(self):
def cb_add_wildcard(self) -> None:
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Dir for Wildcard")
root.destroy()
@@ -1182,14 +1182,14 @@ class App:
self.files.append(str(Path(d) / "**" / "*"))
self._rebuild_files_list()
def cb_browse_shots_base(self):
def cb_browse_shots_base(self) -> None:
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Screenshots Base Dir")
root.destroy()
if d:
dpg.set_value("shots_base_dir", d)
def cb_add_shots(self):
def cb_add_shots(self) -> None:
root = hide_tk_root()
paths = filedialog.askopenfilenames(
title="Select Screenshots",
@@ -1204,7 +1204,7 @@ class App:
self.screenshots.append(p)
self._rebuild_shots_list()
def cb_md_only(self):
def cb_md_only(self) -> None:
try:
md, path, _file_items = self._do_generate()
self.last_md = md
@@ -1213,7 +1213,7 @@ class App:
except Exception as e:
self._update_status(f"error: {e}")
def cb_load_prior_log(self):
def cb_load_prior_log(self) -> None:
root = hide_tk_root()
path = filedialog.askopenfilename(
title="Load Session Log",
@@ -1252,7 +1252,7 @@ class App:
except Exception as e:
self._update_status(f"Load error: {e}")
def cb_exit_prior_session(self):
def cb_exit_prior_session(self) -> None:
self.is_viewing_prior_session = False
dpg.configure_item("prior_session_indicator", show=False)
dpg.configure_item("exit_prior_btn", show=False)
@@ -1263,7 +1263,7 @@ class App:
# Restore current session comms
self._rebuild_comms_log()
def cb_reset_session(self):
def cb_reset_session(self) -> None:
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
@@ -1283,7 +1283,7 @@ class App:
self._update_status("session reset")
self._update_response("")
def cb_generate_send(self):
def cb_generate_send(self) -> None:
if self.send_thread and self.send_thread.is_alive():
return
try:
@@ -1337,7 +1337,7 @@ class App:
self.send_thread = threading.Thread(target=do_send, daemon=True)
self.send_thread.start()
def cb_provider_changed(self, sender, app_data):
def cb_provider_changed(self, sender: Any, app_data: Any) -> None:
self.current_provider = app_data
ai_client.reset_session()
ai_client.set_provider(self.current_provider, self.current_model)
@@ -1345,21 +1345,21 @@ class App:
self._rebuild_models_list()
self._fetch_models(self.current_provider)
def cb_model_changed(self, sender, app_data):
def cb_model_changed(self, sender: Any, app_data: Any) -> None:
if app_data:
self.current_model = app_data
ai_client.reset_session()
ai_client.set_provider(self.current_provider, self.current_model)
self._update_status(f"model set: {self.current_model}")
def cb_fetch_models(self):
def cb_fetch_models(self) -> None:
self._fetch_models(self.current_provider)
def cb_clear_tool_log(self):
def cb_clear_tool_log(self) -> None:
self._tool_log.clear()
self._rebuild_tool_log()
def cb_clear_comms(self):
def cb_clear_comms(self) -> None:
ai_client.clear_comms_log()
with self._pending_comms_lock:
self._pending_comms.clear()
@@ -1369,7 +1369,7 @@ class App:
dpg.delete_item("comms_scroll", children_only=True)
# ---- project callbacks ----
def cb_add_project(self):
def cb_add_project(self) -> None:
root = hide_tk_root()
p = filedialog.askopenfilename(
title="Select Project .toml",
@@ -1380,7 +1380,7 @@ class App:
self.project_paths.append(p)
self._rebuild_projects_list()
def cb_new_project(self):
def cb_new_project(self) -> None:
root = hide_tk_root()
p = filedialog.asksaveasfilename(
title="Create New Project .toml",
@@ -1399,7 +1399,7 @@ class App:
self._rebuild_projects_list()
self._update_status(f"created project: {name}")
def _cb_new_project_automated(self, path):
def _cb_new_project_automated(self, path: str) -> None:
"""Automated version of cb_new_project that doesn't show a dialog."""
if not path:
return
@@ -1420,14 +1420,14 @@ class App:
"callback": main_thread_work
})
def cb_browse_git_dir(self):
def cb_browse_git_dir(self) -> None:
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Git Directory")
root.destroy()
if d and dpg.does_item_exist("project_git_dir"):
dpg.set_value("project_git_dir", d)
def cb_browse_main_context(self):
def cb_browse_main_context(self) -> None:
root = hide_tk_root()
p = filedialog.askopenfilename(title="Select Main Context File")
root.destroy()
@@ -1435,11 +1435,11 @@ class App:
dpg.set_value("project_main_context", p)
# ---- discussion callbacks ----
def cb_disc_switch(self, sender, app_data):
def cb_disc_switch(self, sender: Any, app_data: Any) -> None:
if app_data and app_data != self.active_discussion:
self._switch_discussion(app_data)
def cb_disc_create(self):
def cb_disc_create(self) -> None:
if not dpg.does_item_exist("disc_new_name_input"):
return
name = dpg.get_value("disc_new_name_input").strip()
@@ -1449,7 +1449,7 @@ class App:
self._create_discussion(name)
dpg.set_value("disc_new_name_input", "")
def cb_disc_rename(self):
def cb_disc_rename(self) -> None:
if not dpg.does_item_exist("disc_new_name_input"):
return
new_name = dpg.get_value("disc_new_name_input").strip()
@@ -1459,20 +1459,20 @@ class App:
self._rename_discussion(self.active_discussion, new_name)
dpg.set_value("disc_new_name_input", "")
def cb_disc_delete(self):
def cb_disc_delete(self) -> None:
self._delete_discussion(self.active_discussion)
def cb_update_git_commit(self):
def cb_update_git_commit(self) -> None:
self._update_discussion_git_commit()
def cb_disc_save(self):
def cb_disc_save(self) -> None:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self._update_status("discussion saved")
def cb_disc_append_entry(self):
def cb_disc_append_entry(self) -> None:
default_role = self.disc_roles[0] if self.disc_roles else "User"
self.disc_entries.append({
"role": default_role,
@@ -1482,17 +1482,17 @@ class App:
})
self._rebuild_disc_list()
def cb_disc_clear(self):
def cb_disc_clear(self) -> None:
self.disc_entries.clear()
self._rebuild_disc_list()
def cb_disc_truncate(self):
def cb_disc_truncate(self) -> None:
pairs = dpg.get_value("disc_truncate_pairs") if dpg.does_item_exist("disc_truncate_pairs") else 2
self.disc_entries = truncate_entries(self.disc_entries, pairs)
self._rebuild_disc_list()
self._update_status(f"history truncated to {pairs} pairs")
def cb_disc_collapse_all(self):
def cb_disc_collapse_all(self) -> None:
for i, entry in enumerate(self.disc_entries):
tag = f"disc_content_{i}"
if dpg.does_item_exist(tag):
@@ -1500,12 +1500,12 @@ class App:
entry["collapsed"] = True
self._rebuild_disc_list()
def cb_disc_expand_all(self):
def cb_disc_expand_all(self) -> None:
for entry in self.disc_entries:
entry["collapsed"] = False
self._rebuild_disc_list()
def cb_append_message_to_history(self):
def cb_append_message_to_history(self) -> None:
msg = dpg.get_value("ai_input")
if msg:
self.disc_entries.append({
@@ -1516,7 +1516,7 @@ class App:
})
self._rebuild_disc_list()
def cb_append_response_to_history(self):
def cb_append_response_to_history(self) -> None:
resp = self.ai_response
if resp:
self.disc_entries.append({
@@ -1528,7 +1528,7 @@ class App:
self._rebuild_disc_list()
# ---- disc roles ----
def _rebuild_disc_roles_list(self):
def _rebuild_disc_roles_list(self) -> None:
if not dpg.does_item_exist("disc_roles_scroll"):
return
dpg.delete_item("disc_roles_scroll", children_only=True)
@@ -1540,7 +1540,7 @@ class App:
)
dpg.add_text(role)
def _make_disc_remove_role_cb(self, idx: int):
def _make_disc_remove_role_cb(self, idx: int) -> Callable:
def cb():
if idx < len(self.disc_roles):
self.disc_roles.pop(idx)
@@ -1548,7 +1548,7 @@ class App:
self._rebuild_disc_list()
return cb
def cb_disc_add_role(self):
def cb_disc_add_role(self) -> None:
if not dpg.does_item_exist("disc_new_role_input"):
return
name = dpg.get_value("disc_new_role_input").strip()
@@ -1559,7 +1559,7 @@ class App:
self._rebuild_disc_list()
# ---- disc entry list ----
def _render_disc_entry(self, i: int, entry: dict):
def _render_disc_entry(self, i: int, entry: dict) -> None:
# Default to collapsed and read-mode if not specified
if "collapsed" not in entry:
entry["collapsed"] = True
@@ -1633,7 +1633,7 @@ class App:
)
dpg.add_separator()
def _cb_toggle_read(self, sender, app_data, user_data):
def _cb_toggle_read(self, sender: Any, app_data: Any, user_data: Any) -> None:
idx = user_data
# Save edit box content before switching to read mode
tag = f"disc_content_{idx}"
@@ -1642,7 +1642,7 @@ class App:
self.disc_entries[idx]["read_mode"] = not self.disc_entries[idx].get("read_mode", False)
self._rebuild_disc_list()
def _rebuild_disc_list(self):
def _rebuild_disc_list(self) -> None:
"""Full rebuild of the discussion UI. Expensive! Use incrementally where possible."""
if not dpg.does_item_exist("disc_scroll"):
return
@@ -1650,19 +1650,19 @@ class App:
for i, entry in enumerate(self.disc_entries):
self._render_disc_entry(i, entry)
def _make_disc_role_cb(self, idx: int):
def _make_disc_role_cb(self, idx: int) -> Callable:
def cb(sender, app_data):
if idx < len(self.disc_entries):
self.disc_entries[idx]["role"] = app_data
return cb
def _make_disc_content_cb(self, idx: int):
def _make_disc_content_cb(self, idx: int) -> Callable:
def cb(sender, app_data):
if idx < len(self.disc_entries):
self.disc_entries[idx]["content"] = app_data
return cb
def _make_disc_insert_cb(self, idx: int):
def _make_disc_insert_cb(self, idx: int) -> Callable:
def cb():
self.disc_entries.insert(idx, {
"role": "User",
@@ -1673,14 +1673,14 @@ class App:
self._rebuild_disc_list()
return cb
def _make_disc_remove_cb(self, idx: int):
def _make_disc_remove_cb(self, idx: int) -> Callable:
def cb():
if idx < len(self.disc_entries):
self.disc_entries.pop(idx)
self._rebuild_disc_list()
return cb
def _make_disc_toggle_cb(self, idx: int):
def _make_disc_toggle_cb(self, idx: int) -> Callable:
def cb():
if idx < len(self.disc_entries):
tag = f"disc_content_{idx}"
@@ -1691,17 +1691,17 @@ class App:
return cb
# ------------------------------------------------------------ theme
def cb_palette_changed(self, sender, app_data):
def cb_palette_changed(self, sender: Any, app_data: Any) -> None:
theme.apply(app_data, self._read_colour_overrides())
self._update_status(f"palette: {app_data}")
def cb_apply_font(self):
def cb_apply_font(self) -> None:
path = dpg.get_value("theme_font_path").strip()
size = dpg.get_value("theme_font_size")
theme.apply_font(path, size)
self._update_status(f"font applied: {path or '(default)'} @{size}px")
def cb_browse_font(self):
def cb_browse_font(self) -> None:
root = hide_tk_root()
p = filedialog.askopenfilename(
title="Select Font",
@@ -1712,14 +1712,14 @@ class App:
dpg.set_value("theme_font_path", p)
self.cb_apply_font()
def cb_scale_changed(self, sender, app_data):
def cb_scale_changed(self, sender: Any, app_data: Any) -> None:
theme.set_scale(round(app_data, 2))
def _read_colour_overrides(self) -> dict:
return {}
# ------------------------------------------------------------ build ui
def _build_theme_window(self):
def _build_theme_window(self) -> None:
t_cfg = self.config.get("theme", {})
cur_palette = t_cfg.get("palette", "DPG Default")
cur_font_path = t_cfg.get("font_path", "")
@@ -1775,7 +1775,7 @@ class App:
format="%.2f",
)
def _build_context_hub(self):
def _build_context_hub(self) -> None:
with dpg.window(
label="Context Hub",
tag="win_context_hub",
@@ -1879,7 +1879,7 @@ class App:
dpg.add_separator()
dpg.add_button(label="Add Screenshot(s)", callback=self.cb_add_shots)
def _build_ai_settings_hub(self):
def _build_ai_settings_hub(self) -> None:
with dpg.window(
label="AI Settings Hub",
tag="win_ai_settings_hub",
@@ -1938,7 +1938,7 @@ class App:
height=100,
)
def _build_discussion_hub(self):
def _build_discussion_hub(self) -> None:
with dpg.window(
label="Discussion Hub",
tag="win_discussion_hub",
@@ -2012,7 +2012,7 @@ class App:
dpg.add_separator()
dpg.add_button(label="-> History", callback=self.cb_append_response_to_history)
def _build_operations_hub(self):
def _build_operations_hub(self) -> None:
with dpg.window(
label="Operations Hub",
tag="win_operations_hub",
@@ -2047,7 +2047,7 @@ class App:
with dpg.child_window(tag="tool_log_scroll", height=-1, border=False):
pass
def _build_diagnostics_window(self):
def _build_diagnostics_window(self) -> None:
with dpg.window(
label="Diagnostics",
tag="win_diagnostics",
@@ -2085,7 +2085,7 @@ class App:
dpg.add_line_series(list(range(100)), self.perf_history["cpu"], label="cpu usage", tag="perf_cpu_plot")
dpg.set_axis_limits("axis_cpu_y", 0, 100)
def _build_ui(self):
def _build_ui(self) -> None:
# Performance tracking handlers
with dpg.handler_registry():
dpg.add_mouse_click_handler(callback=lambda: self.perf_monitor.record_input_event())
@@ -2166,7 +2166,7 @@ class App:
with dpg.child_window(tag="text_viewer_wrap_container", width=-1, height=-1, border=False, show=False):
dpg.add_text("", tag="text_viewer_wrap", wrap=0)
def _process_pending_gui_tasks(self):
def _process_pending_gui_tasks(self) -> None:
"""Processes tasks queued from background threads on the main thread."""
if not self._pending_gui_tasks:
return
@@ -2228,7 +2228,7 @@ class App:
except Exception as e:
print(f"Error executing GUI hook task: {e}")
def run(self):
def run(self) -> None:
dpg.create_context()
dpg.configure_app(docking=True, docking_space=True, init_file="dpg_layout.ini")
dpg.create_viewport(title="manual slop", width=1680, height=1200)
@@ -2393,7 +2393,7 @@ class App:
self.hook_server.stop()
dpg.destroy_context()
def main():
def main() -> None:
app = App()
app.run()

353
scripts/apply_type_hints.py Normal file
View File

@@ -0,0 +1,353 @@
"""
Type hint applicator for gui_2.py and gui_legacy.py.
Does a single-pass AST-guided line edit to add type annotations.
No dependency on mcp_client — operates directly on file lines.
Run: uv run python scripts/apply_type_hints.py
"""
import ast
import re
import sys
import os
BASE = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
stats = {"auto_none": 0, "manual_sig": 0, "vars": 0, "errors": []}
def abs_path(filename: str) -> str:
return os.path.join(BASE, filename)
def has_value_return(node: ast.AST) -> bool:
"""Check if function has any 'return <expr>' (not bare return or return None)."""
for child in ast.walk(node):
if isinstance(child, ast.Return) and child.value is not None:
if isinstance(child.value, ast.Constant) and child.value.value is None:
continue
return True
return False
def collect_auto_none(tree: ast.Module) -> list[tuple[str, ast.AST]]:
"""Collect functions that can safely get -> None annotation."""
results = []
def scan(scope, prefix=""):
for node in ast.iter_child_nodes(scope):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
name = f"{prefix}{node.name}" if prefix else node.name
if node.returns is None and not has_value_return(node):
untyped = [a.arg for a in node.args.args if a.arg not in ("self", "cls") and a.annotation is None]
if not untyped:
results.append((name, node))
if isinstance(node, ast.ClassDef):
scan(node, prefix=f"{node.name}.")
scan(tree)
return results
def apply_return_none_single_pass(filepath: str) -> int:
"""Add -> None to all qualifying functions in a single pass."""
fp = abs_path(filepath)
with open(fp, 'r', encoding='utf-8') as f:
code = f.read()
tree = ast.parse(code)
candidates = collect_auto_none(tree)
if not candidates:
return 0
lines = code.splitlines(keepends=True)
# For each candidate, find the line with the colon that ends the signature
# and insert ' -> None' before it.
# We need to find the colon on the signature line (not inside default values etc.)
# Strategy: the signature ends at body[0].lineno - 1 (last sig line)
# Find the last ':' on that line that's at the right indentation
edits = [] # (line_idx, col_of_colon)
for name, node in candidates:
if not node.body:
continue
# The colon is on the last line of the signature
# For single-line defs: `def foo(self):` -> colon at end
# For multi-line defs: last line ends with `):` or similar
body_start = node.body[0].lineno # 1-indexed
sig_last_line_idx = body_start - 2 # 0-indexed, the line before body
# But for single-line signatures, sig_last_line_idx == node.lineno - 1
if sig_last_line_idx < node.lineno - 1:
sig_last_line_idx = node.lineno - 1
line = lines[sig_last_line_idx]
# Find the last colon on this line (the def colon)
# Must handle cases like `def foo(self, x: int):` where there are colons in annotations
# The def colon is always the LAST colon on the line (before any comment)
stripped = line.rstrip('\n\r')
# Remove inline comment
comment_pos = -1
in_str = False
str_char = None
for i, c in enumerate(stripped):
if in_str:
if c == str_char:
in_str = False
continue
if c in ('"', "'"):
in_str = True
str_char = c
continue
if c == '#':
comment_pos = i
break
code_part = stripped[:comment_pos] if comment_pos >= 0 else stripped
# Find last colon in code_part
colon_idx = code_part.rfind(':')
if colon_idx < 0:
stats["errors"].append(f"no colon found: {filepath}:{name} L{sig_last_line_idx+1}")
continue
# Check not already annotated
if '->' in code_part:
continue
edits.append((sig_last_line_idx, colon_idx))
# Apply edits in reverse order to preserve line indices
edits.sort(key=lambda x: x[0], reverse=True)
count = 0
for line_idx, colon_col in edits:
line = lines[line_idx]
new_line = line[:colon_col] + ' -> None' + line[colon_col:]
lines[line_idx] = new_line
count += 1
with open(fp, 'w', encoding='utf-8', newline='') as f:
f.writelines(lines)
return count
# --- Manual signature replacements ---
# These use regex on the def line to do a targeted replacement.
# Each entry: (dotted_name, old_params_pattern, new_full_sig_line)
# We match by finding the exact def line and replacing it.
def apply_manual_sigs(filepath: str, sig_replacements: list[tuple[str, str]]) -> int:
"""Apply manual signature replacements.
sig_replacements: list of (regex_pattern_for_old_line, replacement_line)
"""
fp = abs_path(filepath)
with open(fp, 'r', encoding='utf-8') as f:
code = f.read()
count = 0
for pattern, replacement in sig_replacements:
new_code = re.sub(pattern, replacement, code, count=1)
if new_code != code:
code = new_code
count += 1
else:
stats["errors"].append(f"manual_sig no match: {filepath}: {pattern[:60]}")
with open(fp, 'w', encoding='utf-8', newline='') as f:
f.write(code)
return count
def apply_var_replacements(filepath: str, var_replacements: list[tuple[str, str]]) -> int:
"""Apply variable declaration replacements.
var_replacements: list of (regex_pattern_for_old_decl, replacement_decl)
"""
fp = abs_path(filepath)
with open(fp, 'r', encoding='utf-8') as f:
code = f.read()
count = 0
for pattern, replacement in var_replacements:
new_code = re.sub(pattern, replacement, code, count=1)
if new_code != code:
code = new_code
count += 1
else:
stats["errors"].append(f"var no match: {filepath}: {pattern[:60]}")
with open(fp, 'w', encoding='utf-8', newline='') as f:
f.write(code)
return count
def verify_syntax(filepath: str) -> str:
fp = abs_path(filepath)
try:
with open(fp, 'r', encoding='utf-8') as f:
code = f.read()
ast.parse(code)
return f"Syntax OK: {filepath}"
except SyntaxError as e:
return f"SyntaxError in {filepath} at line {e.lineno}: {e.msg}"
# ============================================================
# gui_2.py manual signatures (Tier 3 items)
# ============================================================
GUI2_MANUAL_SIGS = [
(r'def resolve_pending_action\(self, action_id: str, approved: bool\):',
r'def resolve_pending_action(self, action_id: str, approved: bool) -> bool:'),
(r'def _cb_start_track\(self, user_data=None\):',
r'def _cb_start_track(self, user_data: Any = None) -> None:'),
(r'def _start_track_logic\(self, track_data\):',
r'def _start_track_logic(self, track_data: dict[str, Any]) -> None:'),
(r'def _cb_ticket_retry\(self, ticket_id\):',
r'def _cb_ticket_retry(self, ticket_id: str) -> None:'),
(r'def _cb_ticket_skip\(self, ticket_id\):',
r'def _cb_ticket_skip(self, ticket_id: str) -> None:'),
(r'def _render_ticket_dag_node\(self, ticket, tickets_by_id, children_map, rendered\):',
r'def _render_ticket_dag_node(self, ticket: Ticket, tickets_by_id: dict[str, Ticket], children_map: dict[str, list[str]], rendered: set[str]) -> None:'),
]
# ============================================================
# gui_legacy.py manual signatures (Tier 3 items)
# ============================================================
LEGACY_MANUAL_SIGS = [
(r'def _add_kv_row\(parent: str, key: str, val, val_color=None\):',
r'def _add_kv_row(parent: str, key: str, val: Any, val_color: tuple[int, int, int] | None = None) -> None:'),
(r'def _make_remove_file_cb\(self, idx: int\):',
r'def _make_remove_file_cb(self, idx: int) -> Callable:'),
(r'def _make_remove_shot_cb\(self, idx: int\):',
r'def _make_remove_shot_cb(self, idx: int) -> Callable:'),
(r'def _make_remove_project_cb\(self, idx: int\):',
r'def _make_remove_project_cb(self, idx: int) -> Callable:'),
(r'def _make_switch_project_cb\(self, path: str\):',
r'def _make_switch_project_cb(self, path: str) -> Callable:'),
(r'def cb_word_wrap_toggled\(self, sender=None, app_data=None\):',
r'def cb_word_wrap_toggled(self, sender: Any = None, app_data: Any = None) -> None:'),
(r'def cb_provider_changed\(self, sender, app_data\):',
r'def cb_provider_changed(self, sender: Any, app_data: Any) -> None:'),
(r'def cb_model_changed\(self, sender, app_data\):',
r'def cb_model_changed(self, sender: Any, app_data: Any) -> None:'),
(r'def _cb_new_project_automated\(self, path\):',
r'def _cb_new_project_automated(self, path: str) -> None:'),
(r'def cb_disc_switch\(self, sender, app_data\):',
r'def cb_disc_switch(self, sender: Any, app_data: Any) -> None:'),
(r'def _make_disc_remove_role_cb\(self, idx: int\):',
r'def _make_disc_remove_role_cb(self, idx: int) -> Callable:'),
(r'def _cb_toggle_read\(self, sender, app_data, user_data\):',
r'def _cb_toggle_read(self, sender: Any, app_data: Any, user_data: Any) -> None:'),
(r'def _make_disc_role_cb\(self, idx: int\):',
r'def _make_disc_role_cb(self, idx: int) -> Callable:'),
(r'def _make_disc_content_cb\(self, idx: int\):',
r'def _make_disc_content_cb(self, idx: int) -> Callable:'),
(r'def _make_disc_insert_cb\(self, idx: int\):',
r'def _make_disc_insert_cb(self, idx: int) -> Callable:'),
(r'def _make_disc_remove_cb\(self, idx: int\):',
r'def _make_disc_remove_cb(self, idx: int) -> Callable:'),
(r'def _make_disc_toggle_cb\(self, idx: int\):',
r'def _make_disc_toggle_cb(self, idx: int) -> Callable:'),
(r'def cb_palette_changed\(self, sender, app_data\):',
r'def cb_palette_changed(self, sender: Any, app_data: Any) -> None:'),
(r'def cb_scale_changed\(self, sender, app_data\):',
r'def cb_scale_changed(self, sender: Any, app_data: Any) -> None:'),
]
# ============================================================
# gui_2.py variable type annotations
# ============================================================
GUI2_VAR_REPLACEMENTS = [
(r'^CONFIG_PATH = ', 'CONFIG_PATH: Path = '),
(r'^PROVIDERS = ', 'PROVIDERS: list[str] = '),
(r'^COMMS_CLAMP_CHARS = ', 'COMMS_CLAMP_CHARS: int = '),
(r'^C_OUT = ', 'C_OUT: tuple[float, ...] = '),
(r'^C_IN = ', 'C_IN: tuple[float, ...] = '),
(r'^C_REQ = ', 'C_REQ: tuple[float, ...] = '),
(r'^C_RES = ', 'C_RES: tuple[float, ...] = '),
(r'^C_TC = ', 'C_TC: tuple[float, ...] = '),
(r'^C_TR = ', 'C_TR: tuple[float, ...] = '),
(r'^C_TRS = ', 'C_TRS: tuple[float, ...] = '),
(r'^C_LBL = ', 'C_LBL: tuple[float, ...] = '),
(r'^C_VAL = ', 'C_VAL: tuple[float, ...] = '),
(r'^C_KEY = ', 'C_KEY: tuple[float, ...] = '),
(r'^C_NUM = ', 'C_NUM: tuple[float, ...] = '),
(r'^C_SUB = ', 'C_SUB: tuple[float, ...] = '),
(r'^DIR_COLORS = ', 'DIR_COLORS: dict[str, tuple[float, ...]] = '),
(r'^KIND_COLORS = ', 'KIND_COLORS: dict[str, tuple[float, ...]] = '),
(r'^HEAVY_KEYS = ', 'HEAVY_KEYS: set[str] = '),
(r'^DISC_ROLES = ', 'DISC_ROLES: list[str] = '),
(r'^AGENT_TOOL_NAMES = ', 'AGENT_TOOL_NAMES: list[str] = '),
]
# ============================================================
# gui_legacy.py variable type annotations
# ============================================================
LEGACY_VAR_REPLACEMENTS = [
(r'^CONFIG_PATH = ', 'CONFIG_PATH: Path = '),
(r'^PROVIDERS = ', 'PROVIDERS: list[str] = '),
(r'^COMMS_CLAMP_CHARS = ', 'COMMS_CLAMP_CHARS: int = '),
(r'^_DIR_COLORS = \{', '_DIR_COLORS: dict[str, tuple[int, int, int]] = {'),
(r'^_KIND_COLORS = \{', '_KIND_COLORS: dict[str, tuple[int, int, int]] = {'),
(r'^_HEAVY_KEYS = ', '_HEAVY_KEYS: set[str] = '),
(r'^_LABEL_COLOR = ', '_LABEL_COLOR: tuple[int, int, int] = '),
(r'^_VALUE_COLOR = ', '_VALUE_COLOR: tuple[int, int, int] = '),
(r'^_KEY_COLOR = ', '_KEY_COLOR: tuple[int, int, int] = '),
(r'^_NUM_COLOR = ', '_NUM_COLOR: tuple[int, int, int] = '),
(r'^_SUBHDR_COLOR = ', '_SUBHDR_COLOR: tuple[int, int, int] = '),
(r'^_KIND_RENDERERS = \{', '_KIND_RENDERERS: dict[str, Callable] = {'),
(r'^DISC_ROLES = ', 'DISC_ROLES: list[str] = '),
(r'^ _next_id = ', ' _next_id: int = '),
]
if __name__ == "__main__":
print("=== Phase A: Auto-apply -> None (single-pass AST) ===")
n = apply_return_none_single_pass("gui_2.py")
stats["auto_none"] += n
print(f" gui_2.py: {n} applied")
n = apply_return_none_single_pass("gui_legacy.py")
stats["auto_none"] += n
print(f" gui_legacy.py: {n} applied")
# Verify syntax after Phase A
for f in ["gui_2.py", "gui_legacy.py"]:
r = verify_syntax(f)
if "Error" in r:
print(f" ABORT: {r}")
sys.exit(1)
print(" Syntax OK after Phase A")
print("\n=== Phase B: Manual signatures (regex) ===")
n = apply_manual_sigs("gui_2.py", GUI2_MANUAL_SIGS)
stats["manual_sig"] += n
print(f" gui_2.py: {n} applied")
n = apply_manual_sigs("gui_legacy.py", LEGACY_MANUAL_SIGS)
stats["manual_sig"] += n
print(f" gui_legacy.py: {n} applied")
# Verify syntax after Phase B
for f in ["gui_2.py", "gui_legacy.py"]:
r = verify_syntax(f)
if "Error" in r:
print(f" ABORT: {r}")
sys.exit(1)
print(" Syntax OK after Phase B")
print("\n=== Phase C: Variable annotations (regex) ===")
# Use re.MULTILINE so ^ matches line starts
def apply_var_replacements_m(filepath, replacements):
fp = abs_path(filepath)
with open(fp, 'r', encoding='utf-8') as f:
code = f.read()
count = 0
for pattern, replacement in replacements:
new_code = re.sub(pattern, replacement, code, count=1, flags=re.MULTILINE)
if new_code != code:
code = new_code
count += 1
else:
stats["errors"].append(f"var no match: {filepath}: {pattern[:60]}")
with open(fp, 'w', encoding='utf-8', newline='') as f:
f.write(code)
return count
n = apply_var_replacements_m("gui_2.py", GUI2_VAR_REPLACEMENTS)
stats["vars"] += n
print(f" gui_2.py: {n} applied")
n = apply_var_replacements_m("gui_legacy.py", LEGACY_VAR_REPLACEMENTS)
stats["vars"] += n
print(f" gui_legacy.py: {n} applied")
print("\n=== Final Syntax Verification ===")
all_ok = True
for f in ["gui_2.py", "gui_legacy.py"]:
r = verify_syntax(f)
print(f" {f}: {r}")
if "Error" in r:
all_ok = False
print(f"\n=== Summary ===")
print(f" Auto -> None: {stats['auto_none']}")
print(f" Manual sigs: {stats['manual_sig']}")
print(f" Variables: {stats['vars']}")
print(f" Errors: {len(stats['errors'])}")
if stats['errors']:
print("\n=== Errors ===")
for e in stats['errors']:
print(f" {e}")
if all_ok:
print("\nAll files pass syntax check.")
else:
print("\nSYNTAX ERRORS DETECTED — review and fix before committing.")