diff --git a/sloppy.py b/sloppy.py index 33c391ba..7c85ab0e 100644 --- a/sloppy.py +++ b/sloppy.py @@ -56,7 +56,7 @@ if __name__ == "__main__": runner_params = hello_imgui.RunnerParams() runner_params.app_window_params.window_title = "Manual Slop (Web)" runner_params.app_window_params.borderless = True - runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_dock_space + runner_params.imgui_window_params.default_imgui_window_type = hello_imgui.DefaultImGuiWindowType.provide_full_screen_docker_space runner_params.app_window_params.restore_previous_window_size = True with startup_profiler.phase("hello_imgui_run"): diff --git a/src/app_controller.py b/src/app_controller.py index 688fd19d..30852488 100644 --- a/src/app_controller.py +++ b/src/app_controller.py @@ -5,6 +5,7 @@ import inspect import json import os import re +import signal import sys import threading import time @@ -88,57 +89,6 @@ class ConfirmDialog: self._condition.wait(timeout=0.1) return self._approved, self._script -class MMAApprovalDialog: - def __init__(self, ticket_id: str, payload: str) -> None: - """ - [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] - """ - self._payload = payload - self._condition = threading.Condition() - self._done = False - self._approved = False - - def wait(self) -> tuple[bool, str]: - """ - [C: src/mcp_client.py:StdioMCPServer.stop, src/multi_agent_conductor.py:confirm_execution, src/multi_agent_conductor.py:confirm_spawn, tests/conftest.py:live_gui, tests/test_ai_client_concurrency.py:run_t1, tests/test_ai_client_concurrency.py:run_t2, tests/test_ai_server.py:test_server_handles_list_models, tests/test_ai_server.py:test_server_handles_unknown_method, tests/test_ai_server.py:test_server_loads_google_genai_quickly, tests/test_ai_server.py:test_server_outputs_ready_marker, tests/test_ai_server.py:test_server_starts_and_exits_cleanly, tests/test_conductor_engine_abort.py:worker, tests/test_parallel_execution.py:test_worker_pool_limit] - """ - start_time = time.time() - with self._condition: - while not self._done: - if time.time() - start_time > 120: - return False, self._payload - self._condition.wait(timeout=0.1) - return self._approved, self._payload - -class MMASpawnApprovalDialog: - def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None: - """ - [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] - """ - self._prompt = prompt - self._context_md = context_md - self._condition = threading.Condition() - self._done = False - self._approved = False - self._abort = False - - def wait(self) -> dict[str, Any]: - """ - [C: src/mcp_client.py:StdioMCPServer.stop, src/multi_agent_conductor.py:confirm_execution, src/multi_agent_conductor.py:confirm_spawn, tests/conftest.py:live_gui, tests/test_ai_client_concurrency.py:run_t1, tests/test_ai_client_concurrency.py:run_t2, tests/test_ai_server.py:test_server_handles_list_models, tests/test_ai_server.py:test_server_handles_unknown_method, tests/test_ai_server.py:test_server_loads_google_genai_quickly, tests/test_ai_server.py:test_server_outputs_ready_marker, tests/test_ai_server.py:test_server_starts_and_exits_cleanly, tests/test_conductor_engine_abort.py:worker, tests/test_parallel_execution.py:test_worker_pool_limit] - """ - start_time = time.time() - with self._condition: - while not self._done: - if time.time() - start_time > 120: - return {'approved': False, 'abort': True, 'prompt': self._prompt, 'context_md': self._context_md} - self._condition.wait(timeout=0.1) - return { - 'approved': self._approved, - 'abort': self._abort, - 'prompt': self._prompt, - 'context_md': self._context_md - } - #region: API Handlers async def _api_get_key(controller: 'AppController', header_key: str) -> str: """ @@ -794,6 +744,39 @@ def _handle_hide_patch_modal(controller: 'AppController', task: dict): #endregion +def _install_sigint_exit_handler(controller: 'AppController') -> None: + """ + + Install a SIGINT handler that drains the controller's I/O pool + (wait=False) and calls ``os._exit(0)``. This sidesteps the broken + Python interpreter finalization chain that hangs the process when + Ctrl+C is pressed while a worker is mid-task in user code + (e.g. a long-running Gemini/Anthropic HTTP request). + Background: ``ThreadPoolExecutor.__del__`` -> ``shutdown(wait=True)`` + joins all worker threads; atexit handlers do not fire reliably in + that scenario, so the interpreter never reaches the pool-shutdown + path. Bypassing finalization with ``os._exit(0)`` is the only + reliable fix. + [SDM: src/app_controller.py:_install_sigint_exit_handler] + Best-effort: ``signal.signal`` may fail with ``ValueError`` on + non-main threads (e.g. some conftest warmup paths). The failure + is swallowed because production (main thread) is the only case + that matters; the conftest's own atexit fix at commit 8957c9a5 + covers the test fixture's normal-exit path. + [C: src/app_controller.py:AppController.__init__] + """ + def _on_sigint(signum: int, frame: Any) -> None: + try: + controller._io_pool.shutdown(wait=False) + except Exception: + pass + os._exit(0) + try: + signal.signal(signal.SIGINT, _on_sigint) + except (ValueError, OSError): + pass + + class AppController: """ @@ -830,6 +813,7 @@ class AppController: # --- Shared background pool + proactive warmup (startup_speedup_20260606) --- self._io_pool = make_io_pool() + _install_sigint_exit_handler(self) # Warmup progress is a diagnostic; keep stderr quiet unless explicitly asked. # Explicit log_to_stderr arg wins; otherwise default to the SLOP_WARMUP_DEBUG env flag. if log_to_stderr is None: @@ -1542,25 +1526,25 @@ class AppController: def _init_actions(self) -> None: # Set up state-related action maps self._clickable_actions: dict[str, Callable[..., Any]] = { - 'btn_reset': self._handle_reset_session, - 'btn_gen_send': self._handle_generate_send, - 'btn_md_only': self._handle_md_only, - 'btn_approve_script': self._handle_approve_script, - 'btn_reject_script': self._handle_reject_script, - 'btn_project_save': self._cb_project_save, - 'btn_disc_create': self._cb_disc_create, - 'btn_mma_plan_epic': self._cb_plan_epic, - 'btn_mma_accept_tracks': self._cb_accept_tracks, - 'btn_mma_start_track': self._cb_start_track, - 'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type), - 'btn_approve_tool': self._handle_approve_ask, - 'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True), - 'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True), - 'btn_prune_logs': self.cb_prune_logs, - 'btn_reset_base_prompt': self._cb_reset_base_prompt, + 'btn_reset': self._handle_reset_session, + 'btn_gen_send': self._handle_generate_send, + 'btn_md_only': self._handle_md_only, + 'btn_approve_script': self._handle_approve_script, + 'btn_reject_script': self._handle_reject_script, + 'btn_project_save': self._cb_project_save, + 'btn_disc_create': self._cb_disc_create, + 'btn_mma_plan_epic': self._cb_plan_epic, + 'btn_mma_accept_tracks': self._cb_accept_tracks, + 'btn_mma_start_track': self._cb_start_track, + 'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type), + 'btn_approve_tool': self._handle_approve_ask, + 'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True), + 'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True), + 'btn_prune_logs': self.cb_prune_logs, + 'btn_reset_base_prompt': self._cb_reset_base_prompt, 'btn_show_base_prompt_diff': self._cb_show_base_prompt_diff, - 'btn_rebuild_rag_index': self._rebuild_rag_index, - 'btn_clear_summary_cache': self._handle_clear_summary_cache, + 'btn_rebuild_rag_index': self._rebuild_rag_index, + 'btn_clear_summary_cache': self._handle_clear_summary_cache, } self._drag_actions: dict[str, Callable[..., Any]] = {} self._right_clickable_actions: dict[str, Callable[..., Any]] = {} @@ -1587,48 +1571,6 @@ class AppController: else: ai_client._gemini_cli_adapter.binary_path = str(path) - def _set_rag_status(self, status: str) -> None: - """Thread-safe update of rag_status via the GUI task queue.""" - with self._pending_gui_tasks_lock: - self._pending_gui_tasks.append({ - "action": "set_value", - "item": "rag_status", - "value": status - }) - - def _rebuild_rag_index(self) -> None: - """Background thread that re-indexes all files in the current project.""" - if not self.rag_config or not self.rag_config.enabled or not self.rag_engine: - return - - def _run(): - try: - self._set_rag_status("indexing...") - import concurrent.futures - - # 1. Incremental indexing of current files in parallel - with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: - futures = [] - def do_index(p): - if self.rag_engine: self.rag_engine.index_file(p) - for f in self.files: - path = f.path if hasattr(f, "path") else str(f) - futures.append(executor.submit(do_index, path)) - concurrent.futures.wait(futures) - - # 2. Cleanup stale entries (files no longer tracked) - indexed_paths = self.rag_engine.get_all_indexed_paths() - current_paths = {f.path if hasattr(f, "path") else str(f) for f in self.files} - stale_paths = [p for p in indexed_paths if p not in current_paths] - if stale_paths: - self.rag_engine.delete_documents_by_path(stale_paths) - - self._set_rag_status("ready") - except Exception as e: - self._set_rag_status(f"error: {e}") - - self.submit_io(_run) - def _trigger_gui_refresh(self): with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({'action': 'set_comms_dirty'}) @@ -2106,7 +2048,6 @@ class AppController: self._trigger_gui_refresh() self.ai_status = f"viewing prior session: {session_dir.name} ({len(entries)} entries)" - def cb_exit_prior_session(self): """ [C: src/gui_2.py:App._render_comms_history_panel, src/gui_2.py:App._render_prior_session_view] @@ -2187,6 +2128,21 @@ class AppController: self._refresh_from_project() self._configure_mcp_for_project() + def inject_context(self, data: dict) -> None: + """ + Programmatic context injection. + [C: tests/test_headless_simulation.py:test_mma_track_lifecycle_simulation] + """ + file_path = data.get("file_path") + if file_path: + if not os.path.isabs(file_path): + file_path = os.path.relpath(file_path, self.active_project_root) + existing = next((f for f in self.files if (f.path if hasattr(f, "path") else str(f)) == file_path), None) + if not existing: + item = models.FileItem(path=file_path) + self.files.append(item) + self._refresh_from_project() + def _prune_old_logs(self) -> None: """Asynchronously prunes old insignificant logs on startup.""" @@ -2203,7 +2159,7 @@ class AppController: def _fetch_models(self, provider: str) -> None: """ - [C: src/gui_2.py:App.run] + [C: src/gui_2.py:App.run] """ # In the desktop GUI, model listing imports the provider SDKs (the same # ~2s C-extension load warmup pays for). Defer it until the first frame is @@ -2236,10 +2192,8 @@ class AppController: def start_services(self, app: Any = None): """ - - - Starts background threads. - [C: src/gui_2.py:App.__init__] + Starts background threads. + [C: src/gui_2.py:App.__init__] """ self._prune_old_logs() self._init_ai_and_hooks(app) @@ -2248,9 +2202,8 @@ class AppController: def _compute_warmup_list(self) -> list[str]: """ - - Returns the list of modules to warm on the _io_pool at startup. - [SDM: src/app_controller.py:_compute_warmup_list] + Returns the list of modules to warm on the _io_pool at startup. + [SDM: src/app_controller.py:_compute_warmup_list] """ modules: list[str] = [ "google.genai", @@ -2348,10 +2301,8 @@ class AppController: def shutdown(self) -> None: """ - - - Stops background threads and cleans up resources. - [C: src/gui_2.py:App.run, src/gui_2.py:App.shutdown, tests/conftest.py:app_instance, tests/conftest.py:mock_app] + Stops background threads and cleans up resources. + [C: src/gui_2.py:App.run, src/gui_2.py:App.shutdown, tests/conftest.py:app_instance, tests/conftest.py:mock_app] """ from src import ai_client ai_client.cleanup() @@ -2456,10 +2407,8 @@ class AppController: def _handle_request_event(self, event: events.UserRequestEvent) -> None: """ - - - Processes a UserRequestEvent by calling the AI client. - [C: tests/test_live_gui_integration_v2.py:test_user_request_error_handling, tests/test_live_gui_integration_v2.py:test_user_request_integration_flow, tests/test_rag_integration.py:test_rag_integration] + Processes a UserRequestEvent by calling the AI client. + [C: tests/test_live_gui_integration_v2.py:test_user_request_error_handling, tests/test_live_gui_integration_v2.py:test_user_request_integration_flow, tests/test_rag_integration.py:test_rag_integration] """ self.ai_status = 'sending...' @@ -2536,127 +2485,6 @@ class AppController: except Exception as e: self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}) - def _offload_entry_payload(self, entry: Dict[str, Any]) -> Dict[str, Any]: - optimized = copy.deepcopy(entry) - kind = optimized.get("kind") - payload = optimized.get("payload", {}) - if kind == "tool_result" and "output" in payload: - output = payload["output"] - ref_path = session_logger.log_tool_output(output) - if ref_path: - filename = Path(ref_path).name - payload["output"] = f"[REF:{filename}]" - if kind == "tool_call" and "script" in payload: - script = payload["script"] - ref_path = session_logger.log_tool_call(script, "LOG_ONLY", None) - if ref_path: - filename = Path(ref_path).name - payload["script"] = f"[REF:{filename}]" - return optimized - - def _on_ai_stream(self, text: str) -> None: - """Handles streaming text from the AI.""" - self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"}) - - def _on_comms_entry(self, entry: Dict[str, Any]) -> None: - """ - [C: tests/test_app_controller_offloading.py:test_on_comms_entry_tool_result_offloading] - """ - optimized_entry = self._offload_entry_payload(entry) - session_logger.log_comms(optimized_entry) - entry["local_ts"] = time.time() - kind = entry.get("kind") - payload = entry.get("payload", {}) - - if kind == "response" and "usage" in payload: - u = payload["usage"] - inp = u.get("input_tokens") or u.get("prompt_tokens") or 0 - out = u.get("output_tokens") or u.get("completion_tokens") or 0 - cache_read = u.get("cache_read_input_tokens") or 0 - cache_create = u.get("cache_creation_input_tokens") or 0 - total = u.get("total_tokens") or 0 - - # Store normalized usage back in payload for history rendering - u["input_tokens"] = inp - u["output_tokens"] = out - u["cache_read_input_tokens"] = cache_read - - self.session_usage["input_tokens"] += inp - self.session_usage["output_tokens"] += out - self.session_usage["cache_read_input_tokens"] += cache_read - self.session_usage["cache_creation_input_tokens"] += cache_create - self.session_usage["total_tokens"] += total - input_t = u.get("input_tokens") or 0 - output_t = u.get("output_tokens") or 0 - model = payload.get("model", "unknown") - self._token_history.append({ - "time": time.time(), - "input": input_t, - "output": output_t, - "model": model - }) - - if kind == "request": - if self.ui_auto_add_history: - with self._pending_history_adds_lock: - self._pending_history_adds.append({ - "role": "User", - "content": payload.get("message", ""), - "collapsed": payload.get("collapsed", False), - "ts": entry.get("ts", project_manager.now_ts()) - }) - - if kind == "response": - if self.ui_auto_add_history: - role = payload.get("role", "AI") - text_content = payload.get("text", "") - if text_content.strip(): - segments, parsed_response = thinking_parser.parse_thinking_trace(text_content) - entry_obj = { - "role": role, - "content": parsed_response.strip() if parsed_response else "", - "collapsed": True, - "ts": entry.get("ts", project_manager.now_ts()) - } - if "usage" in payload: - entry_obj["usage"] = payload["usage"] - if segments: - entry_obj["thinking_segments"] = [{"content": s.content, "marker": s.marker} for s in segments] - - if entry_obj["content"] or segments: - with self._pending_history_adds_lock: - self._pending_history_adds.append(entry_obj) - - if kind in ("tool_result", "tool_call"): - if self.ui_auto_add_history: - role = "Tool" if kind == "tool_result" else "Vendor API" - content = "" - if kind == "tool_result": - content = payload.get("output", "") - else: - content = payload.get("script") or payload.get("args") or payload.get("message", "") - if isinstance(content, dict): - content = json.dumps(content, indent=1) - with self._pending_history_adds_lock: - self._pending_history_adds.append({ - "role": role, - "content": f"[{kind.upper().replace('_', ' ')}]\n{content}", - "collapsed": True, - "ts": entry.get("ts", project_manager.now_ts()) - }) - if kind == "history_add": - payload = entry.get("payload", {}) - with self._pending_history_adds_lock: - self._pending_history_adds.append({ - "role": payload.get("role", "AI"), - "content": payload.get("content", ""), - "collapsed": payload.get("collapsed", False), - "ts": entry.get("ts", project_manager.now_ts()) - }) - return - with self._pending_comms_lock: - self._pending_comms.append(entry) - def _on_tool_log(self, script: str, result: str) -> None: """ [C: tests/test_app_controller_offloading.py:test_on_tool_log_offloading] @@ -2667,125 +2495,6 @@ class AppController: with self._pending_tool_calls_lock: self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) - def _on_api_event(self, event_name: str = "generic_event", **kwargs: Any) -> None: - """ - [C: tests/test_gui_updates.py:test_gui_updates_on_event] - """ - payload = kwargs.get("payload", {}) - # Push to background event queue, NOT GUI queue - self.event_queue.put("refresh_api_metrics", payload) - if self.test_hooks_enabled: - with self._api_event_queue_lock: - self._api_event_queue.append({"type": event_name, "payload": payload}) - - def _on_performance_alert(self, message: str) -> None: - self.diagnostic_log.append({ - "ts": project_manager.now_ts(), - "message": message, - "type": "performance" - }) - - def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]: - """ - [C: tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_mutating_tool_triggers_callback, tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_rejection_prevents_dispatch] - """ - if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): - self.ai_status = "running powershell..." - output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback) - self.ai_status = "powershell done, awaiting AI..." - return output - dialog = ConfirmDialog(script, base_dir) - is_headless = "--headless" in sys.argv - if is_headless: - with self._pending_dialog_lock: - self._pending_actions[dialog._uid] = dialog - else: - with self._pending_dialog_lock: - self._pending_dialog = dialog - if self.test_hooks_enabled and hasattr(self, '_api_event_queue'): - with self._api_event_queue_lock: - self._api_event_queue.append({ - "type": "script_confirmation_required", - "action_id": dialog._uid, - "script": str(script), - "base_dir": str(base_dir), - "ts": time.time() - }) - approved, final_script = dialog.wait() - if is_headless: - with self._pending_dialog_lock: - if dialog._uid in self._pending_actions: - del self._pending_actions[dialog._uid] - if not approved: - self._append_tool_log(final_script, "REJECTED by user") - return None - self.ai_status = "running powershell..." - output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback) - self._append_tool_log(final_script, output) - self.ai_status = "powershell done, awaiting AI..." - return output - - def _append_tool_log(self, script: str, result: str, source_tier: str | None = None, elapsed_ms: float = 0.0) -> None: - """ - [C: tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_has_source_tier, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_keys, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_stores_dict] - """ - self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) - tool_name = self._extract_tool_name(script) - is_failure = "REJECTED" in result or "Error" in result or "error" in result.lower() - if tool_name: - if tool_name not in self._tool_stats: - self._tool_stats[tool_name] = {"count": 0, "total_time_ms": 0.0, "failures": 0} - self._tool_stats[tool_name]["count"] += 1 - self._tool_stats[tool_name]["total_time_ms"] += elapsed_ms - if is_failure: - self._tool_stats[tool_name]["failures"] += 1 - self.ui_last_script_text = script - self.ui_last_script_output = result - self._trigger_script_blink = True - self.show_script_output = True - if self.ui_auto_scroll_tool_calls: - self._scroll_tool_calls_to_bottom = True - - def _extract_tool_name(self, script: str) -> str: - if not script: - return "unknown" - script_lower = script.lower() - if "powershell" in script_lower or "run_powershell" in script_lower: - return "run_powershell" - if "read_file" in script_lower: - return "read_file" - if "write_file" in script_lower or "write" in script_lower: - return "write_file" - if "list_directory" in script_lower or "ls" in script_lower: - return "list_directory" - if "search_files" in script_lower or "glob" in script_lower: - return "search_files" - if "web_search" in script_lower: - return "web_search" - if "fetch_url" in script_lower: - return "fetch_url" - if "py_get" in script_lower: - return "py_get_skeleton" - return "other" - - def resolve_pending_action(self, action_id: str, approved: bool) -> bool: - with self._pending_dialog_lock: - if action_id in self._pending_actions: - dialog = self._pending_actions[action_id] - with dialog._condition: - dialog._approved = approved - dialog._done = True - dialog._condition.notify_all() - return True - elif self._pending_dialog and self._pending_dialog._uid == action_id: - dialog = self._pending_dialog - with dialog._condition: - dialog._approved = approved - dialog._done = True - dialog._condition.notify_all() - return True - return False - @property def _pending_mma_spawn(self) -> Optional[Dict[str, Any]]: return self._pending_mma_spawns[0] if self._pending_mma_spawns else None @@ -2918,6 +2627,339 @@ class AppController: return _api_token_stats(self) return api + def _cb_reset_base_prompt(self, user_data=None) -> None: + """ + [C: src/gui_2.py:App._render_system_prompts_panel] + """ + self.ui_base_system_prompt = ai_client._SYSTEM_PROMPT + self.ui_use_default_base_prompt = False + + def _handle_clear_summary_cache(self, user_data: Any = None) -> None: + self.summary_cache.clear() + self.ai_status = 'summary cache cleared' + + def _cb_show_base_prompt_diff(self, user_data=None) -> None: + """ + [C: src/gui_2.py:App._render_system_prompts_panel] + """ + self._show_base_prompt_diff_modal = True + + def clear_last_error(self) -> None: + """Reset last_error after a successful response cycle. + [C: src/vendor_state.py:get_vendor_state] + """ + self.last_error = None + + #region: Layout + + def _cb_save_workspace_profile(self, name: str, scope: str = 'project') -> None: + """ + [C: src/gui_2.py:App._render_save_workspace_profile_modal] + """ + if not hasattr(self, '_app') or not self._app: + return + profile = self._app._capture_workspace_profile(name) + self.workspace_manager.save_profile(profile, scope=scope) + self.workspace_profiles = self.workspace_manager.load_all_profiles() + self._app.workspace_profiles = self.workspace_profiles + + def _cb_delete_workspace_profile(self, name: str, scope: str = 'project') -> None: + """ + [C: src/gui_2.py:App._show_menus] + """ + self.workspace_manager.delete_profile(name, scope=scope) + self.workspace_profiles = self.workspace_manager.load_all_profiles() + if hasattr(self, '_app') and self._app: + self._app.workspace_profiles = self.workspace_profiles + + def _cb_load_workspace_profile(self, name: str) -> None: + """ + [C: src/gui_2.py:App._show_menus] + """ + if name in self.workspace_profiles: + profile = self.workspace_profiles[name] + if hasattr(self, '_app') and self._app: + self._app._apply_workspace_profile(profile) + + #endregion: Layout + + #region: Serialization + + def _flush_to_project(self) -> None: + """ + [C: src/gui_2.py:App._render_discussion_entry_controls, src/gui_2.py:App._render_main_interface, src/gui_2.py:App._render_projects_panel, src/gui_2.py:App._show_menus, tests/test_view_presets.py:test_save_view_preset] + """ + proj = self.project + proj.setdefault("output", {})["output_dir"] = self.ui_output_dir + proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir + proj["files"]["paths"] = self.files + proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir + proj["screenshots"]["paths"] = self.screenshots + proj.setdefault("project", {}) + proj["project"]["git_dir"] = self.ui_project_git_dir + proj.setdefault("conductor", {})["dir"] = self.ui_project_conductor_dir + proj["project"]["system_prompt"] = self.ui_project_system_prompt + proj["project"]["active_preset"] = self.ui_project_preset_name + proj["project"]["word_wrap"] = self.ui_word_wrap + proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms + proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls + proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path + proj.setdefault("agent", {}).setdefault("tools", {}) + for t_name in models.AGENT_TOOL_NAMES: + proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True) + self._flush_disc_entries_to_project() + disc_sec = proj.setdefault("discussion", {}) + disc_sec["roles"] = self.disc_roles + disc_sec["active"] = self.active_discussion + disc_sec["auto_add"] = self.ui_auto_add_history + proj["view_presets"] = [vp.to_dict() for vp in self.view_presets] + # Save MMA State + mma_sec = proj.setdefault("mma", {}) + mma_sec["epic"] = self.ui_epic_input + mma_sec["tier_models"] = {t: {"model": d["model"], "provider": d.get("provider", "gemini"), "tool_preset": d.get("tool_preset")} for t, d in self.mma_tier_usage.items()} + if self.active_track: + mma_sec["active_track"] = asdict(self.active_track) + else: + mma_sec["active_track"] = None + + cleaned_proj = project_manager.clean_nones(proj) + project_manager.save_project(cleaned_proj, self.active_project_path) + + def _flush_to_config(self) -> None: + """ + [C: src/gui_2.py:App._render_discussion_entry_controls, src/gui_2.py:App._render_main_interface, src/gui_2.py:App._render_projects_panel, src/gui_2.py:App._render_theme_panel, src/gui_2.py:App._show_menus, tests/test_system_prompt_exposure.py:TestSystemPromptExposure.test_app_controller_flush_saves_prompts] + """ + self.config["ai"] = { + "provider": self.current_provider, + "model": self.current_model, + "temperature": self.temperature, + "top_p": self.top_p, + "max_tokens": self.max_tokens, + "history_trunc_limit": self.history_trunc_limit, + "active_preset": self.ui_global_preset_name, + } + self.config["ai"]["system_prompt"] = self.ui_global_system_prompt + self.config["ai"]["base_system_prompt"] = self.ui_base_system_prompt + self.config["ai"]["use_default_base_prompt"] = self.ui_use_default_base_prompt + + if self.rag_config: + self.config["rag"] = self.rag_config.to_dict() + + self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path} + from src import bg_shader + # Update gui section while preserving other keys like bg_shader_enabled + gui_cfg = self.config.get("gui", {}) + gui_cfg.update({ + "show_windows": self.show_windows, + "separate_message_panel": getattr(self, "ui_separate_message_panel", False), + "separate_response_panel": getattr(self, "ui_separate_response_panel", False), + "separate_tool_calls_panel": getattr(self, "ui_separate_tool_calls_panel", False), + "separate_external_tools": getattr(self, "ui_separate_external_tools", False), + "separate_task_dag": self.ui_separate_task_dag, + "separate_usage_analytics": self.ui_separate_usage_analytics, + "separate_tier1": self.ui_separate_tier1, + "separate_tier2": self.ui_separate_tier2, + "separate_tier3": self.ui_separate_tier3, + "separate_tier4": self.ui_separate_tier4, + "bg_shader_enabled": bg_shader.get_bg().enabled + }) + self.config["gui"] = gui_cfg + + # Explicitly save theme state into the config dict + theme.save_to_config(self.config) + + #endregion: Serialization + + #region: Diagnostics + + def _on_performance_alert(self, message: str) -> None: + self.diagnostic_log.append({ + "ts": project_manager.now_ts(), + "message": message, + "type": "performance" + }) + + #endregion: Diagnostics + + #region: Usage Analytics + + def get_session_insights(self) -> Dict[str, Any]: + """ + [C: src/gui_2.py:App._render_session_insights_panel] + """ + from src import cost_tracker + total_input = sum(e["input"] for e in self._token_history) + total_output = sum(e["output"] for e in self._token_history) + total_tokens = total_input + total_output + elapsed_min = (time.time() - self._session_start_time) / 60.0 if self._token_history else 0 + burn_rate = total_tokens / elapsed_min if elapsed_min > 0 else 0 + session_cost = cost_tracker.estimate_cost("gemini-2.5-flash", total_input, total_output) + completed = sum(1 for t in self.active_tickets if t.get("status") == "complete") + efficiency = total_tokens / completed if completed > 0 else 0 + return { + "total_tokens": total_tokens, + "total_input": total_input, + "total_output": total_output, + "elapsed_min": elapsed_min, + "burn_rate": burn_rate, + "session_cost": session_cost, + "completed_tickets": completed, + "efficiency": efficiency, + "call_count": len(self._token_history) + } + + def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None: + """ + [C: tests/test_gui_updates.py:test_telemetry_data_updates_correctly] + """ + if "latency" in payload: + self.session_usage["last_latency"] = payload["latency"] + if "usage" in payload and "percentage" in payload["usage"]: + self.session_usage["percentage"] = payload["usage"]["percentage"] + self._recalculate_session_usage() + if md_content is not None: + stats = ai_client.get_token_stats(md_content) + # Ensure compatibility if keys are named differently + if "total_tokens" in stats and "estimated_prompt_tokens" not in stats: + stats["estimated_prompt_tokens"] = stats["total_tokens"] + self._token_stats = stats + cache_stats = payload.get("cache_stats") + if cache_stats: + count = cache_stats.get("cache_count", 0) + size_bytes = cache_stats.get("total_size_bytes", 0) + self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)" + quota = payload.get("vendor_quota") + if isinstance(quota, dict) and quota: + self.vendor_quota = quota + if "error" in payload and isinstance(payload["error"], dict): + self.last_error = payload["error"] + self._update_cached_stats() + + def set_vendor_quota(self, provider: str, remaining_pct: float, reset_at: str = "") -> None: + """Update vendor quota state from a quota-bearing API response. + [C: src/vendor_state.py:get_vendor_state] + """ + self.vendor_quota = {"provider": provider, "remaining_pct": remaining_pct, "reset_at": reset_at} + + def _recalculate_session_usage(self) -> None: + usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0, "percentage": self.session_usage.get("percentage", 0.0)} + for entry in ai_client.get_comms_log(): + if entry.get("kind") == "response" and "usage" in entry.get("payload", {}): + u = entry["payload"]["usage"] + for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: + if k in usage: + usage[k] += u.get(k, 0) or 0 + self.session_usage = usage + # Update cached files list + stats = ai_client.get_gemini_cache_stats() + self._cached_files = stats.get("cached_files", []) + + #endregion: Usage Analytics + + #region: Context + + def _cb_clear_summary_cache(self, user_data=None) -> None: + """ + [C: src/gui_2.py:App._render_files_panel] + """ + from src import summarize + summarize._summary_cache.clear() + self._push_mma_state_update() + + def save_context_preset(self, preset: models.ContextPreset) -> None: + self.context_preset_manager.save_preset(self.project, preset) + self._save_active_project() + + def load_context_preset(self, name: str) -> models.ContextPreset: + presets = self.context_preset_manager.load_all(self.project) + if name not in presets: + raise KeyError(f"Context preset '{name}' not found.") + preset = presets[name] + + # Update only temporary context state, not project files + import copy + self.context_files = [] + for f in preset.files: + fi = models.FileItem(path=f.path, view_mode=f.view_mode) + fi.custom_slices = copy.deepcopy(f.custom_slices) if hasattr(f, 'custom_slices') else [] + fi.ast_mask = copy.deepcopy(f.ast_mask) if hasattr(f, 'ast_mask') else {} + fi.ast_signatures = getattr(f, 'ast_signatures', False) + fi.ast_definitions = getattr(f, 'ast_definitions', False) + self.context_files.append(fi) + + self.screenshots = list(preset.screenshots) + return preset + + def _update_cached_stats(self) -> None: + from src import ai_client + self._cached_cache_stats = ai_client.get_gemini_cache_stats() + self._cached_tool_stats = dict(self._tool_stats) + + def clear_cache(self) -> None: + """ + [C: src/gui_2.py:App._render_cache_panel] + """ + from src import ai_client + ai_client.cleanup() + self._update_cached_stats() + + #endregion: Context + + #region: RAG + + def _set_rag_status(self, status: str) -> None: + """Thread-safe update of rag_status via the GUI task queue.""" + with self._pending_gui_tasks_lock: + self._pending_gui_tasks.append({ + "action": "set_value", + "item": "rag_status", + "value": status + }) + + def _rebuild_rag_index(self) -> None: + """Background thread that re-indexes all files in the current project.""" + if not self.rag_config or not self.rag_config.enabled or not self.rag_engine: + return + + def _run(): + try: + self._set_rag_status("indexing...") + import concurrent.futures + + # 1. Incremental indexing of current files in parallel + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + futures = [] + def do_index(p): + if self.rag_engine: self.rag_engine.index_file(p) + for f in self.files: + path = f.path if hasattr(f, "path") else str(f) + futures.append(executor.submit(do_index, path)) + concurrent.futures.wait(futures) + + # 2. Cleanup stale entries (files no longer tracked) + indexed_paths = self.rag_engine.get_all_indexed_paths() + current_paths = {f.path if hasattr(f, "path") else str(f) for f in self.files} + stale_paths = [p for p in indexed_paths if p not in current_paths] + if stale_paths: + self.rag_engine.delete_documents_by_path(stale_paths) + + self._set_rag_status("ready") + except Exception as e: + self._set_rag_status(f"error: {e}") + + self.submit_io(_run) + + #endregion: RAG + + #region: Project + + def _configure_mcp_for_project(self) -> None: + if not self.active_project_path: + return + project_root = Path(self.active_project_path).parent + file_items_as_dicts = [{"path": f.path if hasattr(f, "path") else str(f)} for f in self.files] + mcp_client.configure(file_items_as_dicts, [str(project_root)]) + def _cb_new_project_automated(self, user_data: Any) -> None: if user_data: name = Path(user_data).stem @@ -2933,81 +2975,6 @@ class AppController: models.save_config(self.config) self.ai_status = "config saved" - def _cb_reset_base_prompt(self, user_data=None) -> None: - """ - [C: src/gui_2.py:App._render_system_prompts_panel] - """ - self.ui_base_system_prompt = ai_client._SYSTEM_PROMPT - self.ui_use_default_base_prompt = False - - def _cb_clear_summary_cache(self, user_data=None) -> None: - """ - [C: src/gui_2.py:App._render_files_panel] - """ - from src import summarize - summarize._summary_cache.clear() - self._push_mma_state_update() - - def _handle_clear_summary_cache(self, user_data: Any = None) -> None: - self.summary_cache.clear() - self.ai_status = 'summary cache cleared' - - def _cb_show_base_prompt_diff(self, user_data=None) -> None: - """ - [C: src/gui_2.py:App._render_system_prompts_panel] - """ - self._show_base_prompt_diff_modal = True - - def _cb_disc_create(self) -> None: - nm = self.ui_disc_new_name_input.strip() - if nm: - self._create_discussion(nm) - self.ui_disc_new_name_input = "" - - def _configure_mcp_for_project(self) -> None: - if not self.active_project_path: - return - project_root = Path(self.active_project_path).parent - file_items_as_dicts = [{"path": f.path if hasattr(f, "path") else str(f)} for f in self.files] - mcp_client.configure(file_items_as_dicts, [str(project_root)]) - - def is_project_stale(self) -> bool: - """True when a project switch is queued or running; UI should tint - to signal the controller state lags the user's last click.""" - with self._project_switch_lock: - if self._project_switch_in_progress: - return True - pending = self._project_switch_pending_path - if pending and pending != self.active_project_path: - return True - return False - - def _switch_project(self, path: str) -> None: - """ - [C: src/gui_2.py:App._render_projects_panel] - - Non-blocking: returns immediately, marks the controller as stale, - and runs the actual save/load work in a background thread so the - render loop keeps drawing and lightweight UI interactions (scrolling, - selecting tabs) remain responsive. - """ - if path == self.active_project_path and not self.is_project_stale(): - return - if not Path(path).exists(): - self.ai_status = f"project file not found: {path}" - return - with self._project_switch_lock: - if self._project_switch_in_progress: - if self._project_switch_pending_path == path: - return - self._project_switch_pending_path = path - self.ai_status = f"switch queued: {Path(path).stem} (waiting on {Path(self._project_switch_pending_path or '').stem})" - return - self._project_switch_in_progress = True - self._project_switch_pending_path = path - self.ai_status = f"switching to: {Path(path).stem} (stale ui - ops disabled)" - self.submit_io(self._do_project_switch, path) - def _do_project_switch(self, path: str) -> None: try: self._flush_to_project() @@ -3039,6 +3006,32 @@ class AppController: if pending and pending != self.active_project_path and Path(pending).exists(): self._switch_project(pending) + def _switch_project(self, path: str) -> None: + """ + [C: src/gui_2.py:App._render_projects_panel] + + Non-blocking: returns immediately, marks the controller as stale, + and runs the actual save/load work in a background thread so the + render loop keeps drawing and lightweight UI interactions (scrolling, + selecting tabs) remain responsive. + """ + if path == self.active_project_path and not self.is_project_stale(): + return + if not Path(path).exists(): + self.ai_status = f"project file not found: {path}" + return + with self._project_switch_lock: + if self._project_switch_in_progress: + if self._project_switch_pending_path == path: + return + self._project_switch_pending_path = path + self.ai_status = f"switch queued: {Path(path).stem} (waiting on {Path(self._project_switch_pending_path or '').stem})" + return + self._project_switch_in_progress = True + self._project_switch_pending_path = path + self.ai_status = f"switching to: {Path(path).stem} (stale ui - ops disabled)" + self.submit_io(self._do_project_switch, path) + def _refresh_from_project(self) -> None: # Deserialize FileItems in files.paths """ @@ -3144,34 +3137,31 @@ class AppController: if self.rag_config and self.rag_config.enabled: self._rebuild_rag_index() - def _cb_save_workspace_profile(self, name: str, scope: str = 'project') -> None: - """ - [C: src/gui_2.py:App._render_save_workspace_profile_modal] - """ - if not hasattr(self, '_app') or not self._app: - return - profile = self._app._capture_workspace_profile(name) - self.workspace_manager.save_profile(profile, scope=scope) - self.workspace_profiles = self.workspace_manager.load_all_profiles() - self._app.workspace_profiles = self.workspace_profiles + def is_project_stale(self) -> bool: + """True when a project switch is queued or running; UI should tint + to signal the controller state lags the user's last click.""" + with self._project_switch_lock: + if self._project_switch_in_progress: + return True + pending = self._project_switch_pending_path + if pending and pending != self.active_project_path: + return True + return False - def _cb_delete_workspace_profile(self, name: str, scope: str = 'project') -> None: + def _save_active_project(self) -> None: """ - [C: src/gui_2.py:App._show_menus] + [C: src/gui_2.py:App.delete_context_preset, src/gui_2.py:App.save_context_preset] """ - self.workspace_manager.delete_profile(name, scope=scope) - self.workspace_profiles = self.workspace_manager.load_all_profiles() - if hasattr(self, '_app') and self._app: - self._app.workspace_profiles = self.workspace_profiles + if self.active_project_path: + try: + cleaned = project_manager.clean_nones(self.project) + project_manager.save_project(cleaned, self.active_project_path) + except Exception as e: + self.ai_status = f"save error: {e}" - def _cb_load_workspace_profile(self, name: str) -> None: - """ - [C: src/gui_2.py:App._show_menus] - """ - if name in self.workspace_profiles: - profile = self.workspace_profiles[name] - if hasattr(self, '_app') and self._app: - self._app._apply_workspace_profile(profile) + #endregion: Project + + #region: AI Settings def _apply_preset(self, name: str, scope: str) -> None: """ @@ -3288,73 +3278,15 @@ class AppController: self.view_presets = [vp for vp in self.view_presets if vp.name != name] self._flush_to_project() - def save_context_preset(self, preset: models.ContextPreset) -> None: - self.context_preset_manager.save_preset(self.project, preset) - self._save_active_project() + #endregion: AI Settings - def load_context_preset(self, name: str) -> models.ContextPreset: - presets = self.context_preset_manager.load_all(self.project) - if name not in presets: - raise KeyError(f"Context preset '{name}' not found.") - preset = presets[name] + #region: Discusssion - # Update only temporary context state, not project files - import copy - self.context_files = [] - for f in preset.files: - fi = models.FileItem(path=f.path, view_mode=f.view_mode) - fi.custom_slices = copy.deepcopy(f.custom_slices) if hasattr(f, 'custom_slices') else [] - fi.ast_mask = copy.deepcopy(f.ast_mask) if hasattr(f, 'ast_mask') else {} - fi.ast_signatures = getattr(f, 'ast_signatures', False) - fi.ast_definitions = getattr(f, 'ast_definitions', False) - self.context_files.append(fi) - - self.screenshots = list(preset.screenshots) - return preset - def _cb_load_track(self, track_id: str) -> None: - """ - [C: src/gui_2.py:App._render_mma_track_browser] - """ - state = project_manager.load_track_state(track_id, self.active_project_root) - if state: - try: - # Convert list[Ticket] or list[dict] to list[Ticket] for Track object - tickets = [] - for t in state.tasks: - if isinstance(t, dict): - tickets.append(models.Ticket(**t)) - else: - tickets.append(t) - self.active_track = models.Track( - id=state.metadata.id, - description=state.metadata.name, - tickets=tickets - ) - # Keep dicts for UI table - self._load_active_tickets() - # Load track-scoped history - history = project_manager.load_track_history(track_id, self.active_project_root) - with self._disc_entries_lock: - if history: - self.disc_entries[:] = models.parse_history_entries(history, self.disc_roles) - else: - self.disc_entries.clear() - self._recalculate_session_usage() - self.ai_status = f"Loaded track: {state.metadata.name}" - except Exception as e: - self.ai_status = f"Load track error: {e}" - print(f"Error loading track {track_id}: {e}") - - def _save_active_project(self) -> None: - """ - [C: src/gui_2.py:App.delete_context_preset, src/gui_2.py:App.save_context_preset] - """ - if self.active_project_path: - try: - cleaned = project_manager.clean_nones(self.project) - project_manager.save_project(cleaned, self.active_project_path) - except Exception as e: - self.ai_status = f"save error: {e}" + def _cb_disc_create(self) -> None: + nm = self.ui_disc_new_name_input.strip() + if nm: + self._create_discussion(nm) + self.ui_disc_new_name_input = "" def _get_discussion_names(self) -> list[str]: """ @@ -3407,103 +3339,8 @@ class AppController: disc_data["sent_markdown"] = getattr(self, "discussion_sent_markdown", "") disc_data["sent_system_prompt"] = getattr(self, "discussion_sent_system_prompt", "") - def _create_discussion(self, name: str) -> None: - """ - [C: src/gui_2.py:App._render_discussion_metadata, src/gui_2.py:App._render_synthesis_panel, src/gui_2.py:App._render_takes_panel] - """ - disc_sec = self.project.setdefault("discussion", {}) - discussions = disc_sec.setdefault("discussions", {}) - if name in discussions: - self.ai_status = f"discussion '{name}' already exists" - return - new_disc = project_manager.default_discussion() - # Inherit context from current session if available - if self.context_files: - new_disc["context_snapshot"] = [f.to_dict() if hasattr(f, 'to_dict') else f for f in self.context_files] - discussions[name] = new_disc - self._switch_discussion(name) - - def _branch_discussion(self, index: int) -> None: - """ - [C: src/gui_2.py:App._render_discussion_entry] - """ - self._flush_disc_entries_to_project() - # Generate a unique branch name - base_name = self.active_discussion.split("_take_")[0] - counter = 1 - new_name = f"{base_name}_take_{counter}" - disc_sec = self.project.get("discussion", {}) - discussions = disc_sec.get("discussions", {}) - while new_name in discussions: - counter += 1 - new_name = f"{base_name}_take_{counter}" - - project_manager.branch_discussion(self.project, self.active_discussion, new_name, index) - self._switch_discussion(new_name) - def _rename_discussion(self, old_name: str, new_name: str) -> None: - """ - [C: src/gui_2.py:App._render_discussion_metadata] - """ - disc_sec = self.project.get("discussion", {}) - discussions = disc_sec.get("discussions", {}) - if old_name not in discussions: - return - if new_name in discussions: - self.ai_status = f"discussion '{new_name}' already exists" - return - discussions[new_name] = discussions.pop(old_name) - if self.active_discussion == old_name: - self.active_discussion = new_name - disc_sec["active"] = new_name - - def _delete_discussion(self, name: str) -> None: - """ - [C: src/gui_2.py:App._render_discussion_metadata] - """ - disc_sec = self.project.get("discussion", {}) - discussions = disc_sec.get("discussions", {}) - if len(discussions) <= 1: - self.ai_status = "cannot delete the last discussion" - return - if name not in discussions: - return - del discussions[name] - if self.active_discussion == name: - remaining = sorted(discussions.keys()) - self._switch_discussion(remaining[0]) - - def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None: - """ - [C: src/gui_2.py:App._handle_approve_mma_step, src/gui_2.py:App._handle_approve_spawn, src/gui_2.py:App._render_mma_modals] - """ - if self._pending_mma_approvals: - task = self._pending_mma_approvals.pop(0) - dlg = task.get("dialog_container", [None])[0] - if dlg: - with dlg._condition: - dlg._approved = approved - if payload is not None: - dlg._payload = payload - dlg._done = True - dlg._condition.notify_all() - elif self._pending_mma_spawns: - task = self._pending_mma_spawns.pop(0) - spawn_dlg = task.get("dialog_container", [None])[0] - if spawn_dlg: - with spawn_dlg._condition: - spawn_dlg._approved = approved - spawn_dlg._abort = abort - if prompt is not None: - spawn_dlg._prompt = prompt - if context_md is not None: - spawn_dlg._context_md = context_md - spawn_dlg._done = True - spawn_dlg._condition.notify_all() - def _handle_approve_ask(self) -> None: """ - - Responds with approval for a pending /api/ask request. [C: src/gui_2.py:App._handle_approve_ask, src/gui_2.py:App._render_mma_modals] """ @@ -3526,8 +3363,6 @@ class AppController: def _handle_reject_ask(self) -> None: """ - - Responds with rejection for a pending /api/ask request. [C: src/gui_2.py:App._render_mma_modals] """ @@ -3550,8 +3385,6 @@ class AppController: def _handle_reset_session(self) -> None: """ - - Logic for resetting the AI session and GUI state. [C: src/gui_2.py:App._render_message_panel] """ @@ -3604,32 +3437,6 @@ class AppController: self.top_p = 1.0 self.max_tokens = 8192 - def _handle_md_only(self) -> None: - """ - - - Logic for the 'MD Only' action. - [C: src/gui_2.py:App._render_message_panel] - """ - if self.is_project_stale(): - self.ai_status = "project switch in progress; MD generation disabled" - return - - def worker(): - """ - [C: tests/test_symbol_parsing.py:test_handle_generate_send_appends_definitions, tests/test_symbol_parsing.py:test_handle_generate_send_no_symbols] - """ - try: - md, path, *_ = self._do_generate() - self.last_md = md - self.last_md_path = path - self.ai_status = f"md written: {path.name}" - # Refresh token budget metrics with CURRENT md - self._refresh_api_metrics({}, md_content=md) - except Exception as e: - self.ai_status = f"error: {e}" - self.submit_io(worker) - def _handle_compress_discussion(self) -> None: def worker(): try: @@ -3654,8 +3461,6 @@ class AppController: def _handle_generate_send(self) -> None: """ - - Logic for the 'Gen + Send' action. [C: src/gui_2.py:App._render_message_panel, src/gui_2.py:App._render_synthesis_panel, src/gui_2.py:App._render_takes_panel, tests/test_gui_events_v2.py:test_handle_generate_send_pushes_event, tests/test_symbol_parsing.py:test_handle_generate_send_appends_definitions, tests/test_symbol_parsing.py:test_handle_generate_send_no_symbols] """ @@ -3691,185 +3496,10 @@ class AppController: self.ai_status = f"generate error: {e}" self.submit_io(worker) - def _recalculate_session_usage(self) -> None: - usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0, "percentage": self.session_usage.get("percentage", 0.0)} - for entry in ai_client.get_comms_log(): - if entry.get("kind") == "response" and "usage" in entry.get("payload", {}): - u = entry["payload"]["usage"] - for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: - if k in usage: - usage[k] += u.get(k, 0) or 0 - self.session_usage = usage - # Update cached files list - stats = ai_client.get_gemini_cache_stats() - self._cached_files = stats.get("cached_files", []) - - def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None: - """ - [C: tests/test_gui_updates.py:test_telemetry_data_updates_correctly] - """ - if "latency" in payload: - self.session_usage["last_latency"] = payload["latency"] - if "usage" in payload and "percentage" in payload["usage"]: - self.session_usage["percentage"] = payload["usage"]["percentage"] - self._recalculate_session_usage() - if md_content is not None: - stats = ai_client.get_token_stats(md_content) - # Ensure compatibility if keys are named differently - if "total_tokens" in stats and "estimated_prompt_tokens" not in stats: - stats["estimated_prompt_tokens"] = stats["total_tokens"] - self._token_stats = stats - cache_stats = payload.get("cache_stats") - if cache_stats: - count = cache_stats.get("cache_count", 0) - size_bytes = cache_stats.get("total_size_bytes", 0) - self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)" - quota = payload.get("vendor_quota") - if isinstance(quota, dict) and quota: - self.vendor_quota = quota - if "error" in payload and isinstance(payload["error"], dict): - self.last_error = payload["error"] - self._update_cached_stats() - - def set_vendor_quota(self, provider: str, remaining_pct: float, reset_at: str = "") -> None: - """Update vendor quota state from a quota-bearing API response. - [C: src/vendor_state.py:get_vendor_state] - """ - self.vendor_quota = {"provider": provider, "remaining_pct": remaining_pct, "reset_at": reset_at} - - def clear_last_error(self) -> None: - """Reset last_error after a successful response cycle. - [C: src/vendor_state.py:get_vendor_state] - """ - self.last_error = None - - def _update_cached_stats(self) -> None: - from src import ai_client - self._cached_cache_stats = ai_client.get_gemini_cache_stats() - self._cached_tool_stats = dict(self._tool_stats) - - def clear_cache(self) -> None: - """ - [C: src/gui_2.py:App._render_cache_panel] - """ - from src import ai_client - ai_client.cleanup() - self._update_cached_stats() - - def get_session_insights(self) -> Dict[str, Any]: - """ - [C: src/gui_2.py:App._render_session_insights_panel] - """ - from src import cost_tracker - total_input = sum(e["input"] for e in self._token_history) - total_output = sum(e["output"] for e in self._token_history) - total_tokens = total_input + total_output - elapsed_min = (time.time() - self._session_start_time) / 60.0 if self._token_history else 0 - burn_rate = total_tokens / elapsed_min if elapsed_min > 0 else 0 - session_cost = cost_tracker.estimate_cost("gemini-2.5-flash", total_input, total_output) - completed = sum(1 for t in self.active_tickets if t.get("status") == "complete") - efficiency = total_tokens / completed if completed > 0 else 0 - return { - "total_tokens": total_tokens, - "total_input": total_input, - "total_output": total_output, - "elapsed_min": elapsed_min, - "burn_rate": burn_rate, - "session_cost": session_cost, - "completed_tickets": completed, - "efficiency": efficiency, - "call_count": len(self._token_history) - } - - def _flush_to_project(self) -> None: - """ - [C: src/gui_2.py:App._render_discussion_entry_controls, src/gui_2.py:App._render_main_interface, src/gui_2.py:App._render_projects_panel, src/gui_2.py:App._show_menus, tests/test_view_presets.py:test_save_view_preset] - """ - proj = self.project - proj.setdefault("output", {})["output_dir"] = self.ui_output_dir - proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir - proj["files"]["paths"] = self.files - proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir - proj["screenshots"]["paths"] = self.screenshots - proj.setdefault("project", {}) - proj["project"]["git_dir"] = self.ui_project_git_dir - proj.setdefault("conductor", {})["dir"] = self.ui_project_conductor_dir - proj["project"]["system_prompt"] = self.ui_project_system_prompt - proj["project"]["active_preset"] = self.ui_project_preset_name - proj["project"]["word_wrap"] = self.ui_word_wrap - proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms - proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls - proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path - proj.setdefault("agent", {}).setdefault("tools", {}) - for t_name in models.AGENT_TOOL_NAMES: - proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True) - self._flush_disc_entries_to_project() - disc_sec = proj.setdefault("discussion", {}) - disc_sec["roles"] = self.disc_roles - disc_sec["active"] = self.active_discussion - disc_sec["auto_add"] = self.ui_auto_add_history - proj["view_presets"] = [vp.to_dict() for vp in self.view_presets] - # Save MMA State - mma_sec = proj.setdefault("mma", {}) - mma_sec["epic"] = self.ui_epic_input - mma_sec["tier_models"] = {t: {"model": d["model"], "provider": d.get("provider", "gemini"), "tool_preset": d.get("tool_preset")} for t, d in self.mma_tier_usage.items()} - if self.active_track: - mma_sec["active_track"] = asdict(self.active_track) - else: - mma_sec["active_track"] = None - - cleaned_proj = project_manager.clean_nones(proj) - project_manager.save_project(cleaned_proj, self.active_project_path) - - def _flush_to_config(self) -> None: - """ - [C: src/gui_2.py:App._render_discussion_entry_controls, src/gui_2.py:App._render_main_interface, src/gui_2.py:App._render_projects_panel, src/gui_2.py:App._render_theme_panel, src/gui_2.py:App._show_menus, tests/test_system_prompt_exposure.py:TestSystemPromptExposure.test_app_controller_flush_saves_prompts] - """ - self.config["ai"] = { - "provider": self.current_provider, - "model": self.current_model, - "temperature": self.temperature, - "top_p": self.top_p, - "max_tokens": self.max_tokens, - "history_trunc_limit": self.history_trunc_limit, - "active_preset": self.ui_global_preset_name, - } - self.config["ai"]["system_prompt"] = self.ui_global_system_prompt - self.config["ai"]["base_system_prompt"] = self.ui_base_system_prompt - self.config["ai"]["use_default_base_prompt"] = self.ui_use_default_base_prompt - - if self.rag_config: - self.config["rag"] = self.rag_config.to_dict() - - self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path} - from src import bg_shader - # Update gui section while preserving other keys like bg_shader_enabled - gui_cfg = self.config.get("gui", {}) - gui_cfg.update({ - "show_windows": self.show_windows, - "separate_message_panel": getattr(self, "ui_separate_message_panel", False), - "separate_response_panel": getattr(self, "ui_separate_response_panel", False), - "separate_tool_calls_panel": getattr(self, "ui_separate_tool_calls_panel", False), - "separate_external_tools": getattr(self, "ui_separate_external_tools", False), - "separate_task_dag": self.ui_separate_task_dag, - "separate_usage_analytics": self.ui_separate_usage_analytics, - "separate_tier1": self.ui_separate_tier1, - "separate_tier2": self.ui_separate_tier2, - "separate_tier3": self.ui_separate_tier3, - "separate_tier4": self.ui_separate_tier4, - "bg_shader_enabled": bg_shader.get_bg().enabled - }) - self.config["gui"] = gui_cfg - - # Explicitly save theme state into the config dict - theme.save_to_config(self.config) - def _do_generate(self) -> tuple[str, Path, list[dict[str, Any]], str, str]: """ - - - Returns (full_md, output_path, file_items, stable_md, discussion_text). - [C: src/gui_2.py:App._show_menus, tests/test_context_composition_decoupled.py:test_do_generate_uses_context_files, tests/test_tiered_aggregation.py:test_app_controller_do_generate_uses_persona_strategy] + Returns (full_md, output_path, file_items, stable_md, discussion_text). + [C: src/gui_2.py:App._show_menus, tests/test_context_composition_decoupled.py:test_do_generate_uses_context_files, tests/test_tiered_aggregation.py:test_app_controller_do_generate_uses_persona_strategy] """ self._flush_to_project() self._flush_to_config() @@ -3913,6 +3543,337 @@ class AppController: return full_md, path, file_items, stable_md, discussion_text + def _handle_md_only(self) -> None: + """ + Logic for the 'MD Only' action. + [C: src/gui_2.py:App._render_message_panel] + """ + if self.is_project_stale(): + self.ai_status = "project switch in progress; MD generation disabled" + return + + def worker(): + """ + [C: tests/test_symbol_parsing.py:test_handle_generate_send_appends_definitions, tests/test_symbol_parsing.py:test_handle_generate_send_no_symbols] + """ + try: + md, path, *_ = self._do_generate() + self.last_md = md + self.last_md_path = path + self.ai_status = f"md written: {path.name}" + # Refresh token budget metrics with CURRENT md + self._refresh_api_metrics({}, md_content=md) + except Exception as e: + self.ai_status = f"error: {e}" + self.submit_io(worker) + + def _create_discussion(self, name: str) -> None: + """ + [C: src/gui_2.py:App._render_discussion_metadata, src/gui_2.py:App._render_synthesis_panel, src/gui_2.py:App._render_takes_panel] + """ + disc_sec = self.project.setdefault("discussion", {}) + discussions = disc_sec.setdefault("discussions", {}) + if name in discussions: + self.ai_status = f"discussion '{name}' already exists" + return + new_disc = project_manager.default_discussion() + # Inherit context from current session if available + if self.context_files: + new_disc["context_snapshot"] = [f.to_dict() if hasattr(f, 'to_dict') else f for f in self.context_files] + discussions[name] = new_disc + self._switch_discussion(name) + + def _branch_discussion(self, index: int) -> None: + """ + [C: src/gui_2.py:App._render_discussion_entry] + """ + self._flush_disc_entries_to_project() + # Generate a unique branch name + base_name = self.active_discussion.split("_take_")[0] + counter = 1 + new_name = f"{base_name}_take_{counter}" + disc_sec = self.project.get("discussion", {}) + discussions = disc_sec.get("discussions", {}) + while new_name in discussions: + counter += 1 + new_name = f"{base_name}_take_{counter}" + + project_manager.branch_discussion(self.project, self.active_discussion, new_name, index) + self._switch_discussion(new_name) + + def _rename_discussion(self, old_name: str, new_name: str) -> None: + """ + [C: src/gui_2.py:App._render_discussion_metadata] + """ + disc_sec = self.project.get("discussion", {}) + discussions = disc_sec.get("discussions", {}) + if old_name not in discussions: + return + if new_name in discussions: + self.ai_status = f"discussion '{new_name}' already exists" + return + discussions[new_name] = discussions.pop(old_name) + if self.active_discussion == old_name: + self.active_discussion = new_name + disc_sec["active"] = new_name + + def _delete_discussion(self, name: str) -> None: + """ + [C: src/gui_2.py:App._render_discussion_metadata] + """ + disc_sec = self.project.get("discussion", {}) + discussions = disc_sec.get("discussions", {}) + if len(discussions) <= 1: + self.ai_status = "cannot delete the last discussion" + return + if name not in discussions: + return + del discussions[name] + if self.active_discussion == name: + remaining = sorted(discussions.keys()) + self._switch_discussion(remaining[0]) + + #endregion: Discusssion + +#region: Operations + + def _offload_entry_payload(self, entry: Dict[str, Any]) -> Dict[str, Any]: + optimized = copy.deepcopy(entry) + kind = optimized.get("kind") + payload = optimized.get("payload", {}) + if kind == "tool_result" and "output" in payload: + output = payload["output"] + ref_path = session_logger.log_tool_output(output) + if ref_path: + filename = Path(ref_path).name + payload["output"] = f"[REF:{filename}]" + if kind == "tool_call" and "script" in payload: + script = payload["script"] + ref_path = session_logger.log_tool_call(script, "LOG_ONLY", None) + if ref_path: + filename = Path(ref_path).name + payload["script"] = f"[REF:{filename}]" + return optimized + + def _on_api_event(self, event_name: str = "generic_event", **kwargs: Any) -> None: + """ + [C: tests/test_gui_updates.py:test_gui_updates_on_event] + """ + payload = kwargs.get("payload", {}) + # Push to background event queue, NOT GUI queue + self.event_queue.put("refresh_api_metrics", payload) + if self.test_hooks_enabled: + with self._api_event_queue_lock: + self._api_event_queue.append({"type": event_name, "payload": payload}) + + def _on_ai_stream(self, text: str) -> None: + """Handles streaming text from the AI.""" + self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"}) + + def _on_comms_entry(self, entry: Dict[str, Any]) -> None: + """ + [C: tests/test_app_controller_offloading.py:test_on_comms_entry_tool_result_offloading] + """ + optimized_entry = self._offload_entry_payload(entry) + session_logger.log_comms(optimized_entry) + entry["local_ts"] = time.time() + kind = entry.get("kind") + payload = entry.get("payload", {}) + + if kind == "response" and "usage" in payload: + u = payload["usage"] + inp = u.get("input_tokens") or u.get("prompt_tokens") or 0 + out = u.get("output_tokens") or u.get("completion_tokens") or 0 + cache_read = u.get("cache_read_input_tokens") or 0 + cache_create = u.get("cache_creation_input_tokens") or 0 + total = u.get("total_tokens") or 0 + + # Store normalized usage back in payload for history rendering + u["input_tokens"] = inp + u["output_tokens"] = out + u["cache_read_input_tokens"] = cache_read + + self.session_usage["input_tokens"] += inp + self.session_usage["output_tokens"] += out + self.session_usage["cache_read_input_tokens"] += cache_read + self.session_usage["cache_creation_input_tokens"] += cache_create + self.session_usage["total_tokens"] += total + input_t = u.get("input_tokens") or 0 + output_t = u.get("output_tokens") or 0 + model = payload.get("model", "unknown") + self._token_history.append({ + "time": time.time(), + "input": input_t, + "output": output_t, + "model": model + }) + + if kind == "request": + if self.ui_auto_add_history: + with self._pending_history_adds_lock: + self._pending_history_adds.append({ + "role": "User", + "content": payload.get("message", ""), + "collapsed": payload.get("collapsed", False), + "ts": entry.get("ts", project_manager.now_ts()) + }) + + if kind == "response": + if self.ui_auto_add_history: + role = payload.get("role", "AI") + text_content = payload.get("text", "") + if text_content.strip(): + segments, parsed_response = thinking_parser.parse_thinking_trace(text_content) + entry_obj = { + "role": role, + "content": parsed_response.strip() if parsed_response else "", + "collapsed": True, + "ts": entry.get("ts", project_manager.now_ts()) + } + if "usage" in payload: + entry_obj["usage"] = payload["usage"] + if segments: + entry_obj["thinking_segments"] = [{"content": s.content, "marker": s.marker} for s in segments] + + if entry_obj["content"] or segments: + with self._pending_history_adds_lock: + self._pending_history_adds.append(entry_obj) + + if kind in ("tool_result", "tool_call"): + if self.ui_auto_add_history: + role = "Tool" if kind == "tool_result" else "Vendor API" + content = "" + if kind == "tool_result": + content = payload.get("output", "") + else: + content = payload.get("script") or payload.get("args") or payload.get("message", "") + if isinstance(content, dict): + content = json.dumps(content, indent=1) + with self._pending_history_adds_lock: + self._pending_history_adds.append({ + "role": role, + "content": f"[{kind.upper().replace('_', ' ')}]\n{content}", + "collapsed": True, + "ts": entry.get("ts", project_manager.now_ts()) + }) + if kind == "history_add": + payload = entry.get("payload", {}) + with self._pending_history_adds_lock: + self._pending_history_adds.append({ + "role": payload.get("role", "AI"), + "content": payload.get("content", ""), + "collapsed": payload.get("collapsed", False), + "ts": entry.get("ts", project_manager.now_ts()) + }) + return + with self._pending_comms_lock: + self._pending_comms.append(entry) + + def _append_tool_log(self, script: str, result: str, source_tier: str | None = None, elapsed_ms: float = 0.0) -> None: + """ + [C: tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_has_source_tier, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_keys, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_stores_dict] + """ + self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) + tool_name = self._extract_tool_name(script) + is_failure = "REJECTED" in result or "Error" in result or "error" in result.lower() + if tool_name: + if tool_name not in self._tool_stats: + self._tool_stats[tool_name] = {"count": 0, "total_time_ms": 0.0, "failures": 0} + self._tool_stats[tool_name]["count"] += 1 + self._tool_stats[tool_name]["total_time_ms"] += elapsed_ms + if is_failure: + self._tool_stats[tool_name]["failures"] += 1 + self.ui_last_script_text = script + self.ui_last_script_output = result + self._trigger_script_blink = True + self.show_script_output = True + if self.ui_auto_scroll_tool_calls: + self._scroll_tool_calls_to_bottom = True + + def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]: + """ + [C: tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_mutating_tool_triggers_callback, tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_rejection_prevents_dispatch] + """ + if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): + self.ai_status = "running powershell..." + output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback) + self.ai_status = "powershell done, awaiting AI..." + return output + dialog = ConfirmDialog(script, base_dir) + is_headless = "--headless" in sys.argv + if is_headless: + with self._pending_dialog_lock: + self._pending_actions[dialog._uid] = dialog + else: + with self._pending_dialog_lock: + self._pending_dialog = dialog + if self.test_hooks_enabled and hasattr(self, '_api_event_queue'): + with self._api_event_queue_lock: + self._api_event_queue.append({ + "type": "script_confirmation_required", + "action_id": dialog._uid, + "script": str(script), + "base_dir": str(base_dir), + "ts": time.time() + }) + approved, final_script = dialog.wait() + if is_headless: + with self._pending_dialog_lock: + if dialog._uid in self._pending_actions: + del self._pending_actions[dialog._uid] + if not approved: + self._append_tool_log(final_script, "REJECTED by user") + return None + self.ai_status = "running powershell..." + output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback) + self._append_tool_log(final_script, output) + self.ai_status = "powershell done, awaiting AI..." + return output + + def resolve_pending_action(self, action_id: str, approved: bool) -> bool: + with self._pending_dialog_lock: + if action_id in self._pending_actions: + dialog = self._pending_actions[action_id] + with dialog._condition: + dialog._approved = approved + dialog._done = True + dialog._condition.notify_all() + return True + elif self._pending_dialog and self._pending_dialog._uid == action_id: + dialog = self._pending_dialog + with dialog._condition: + dialog._approved = approved + dialog._done = True + dialog._condition.notify_all() + return True + return False + + def _extract_tool_name(self, script: str) -> str: + if not script: + return "unknown" + script_lower = script.lower() + if "powershell" in script_lower or "run_powershell" in script_lower: + return "run_powershell" + if "read_file" in script_lower: + return "read_file" + if "write_file" in script_lower or "write" in script_lower: + return "write_file" + if "list_directory" in script_lower or "ls" in script_lower: + return "list_directory" + if "search_files" in script_lower or "glob" in script_lower: + return "search_files" + if "web_search" in script_lower: + return "web_search" + if "fetch_url" in script_lower: + return "fetch_url" + if "py_get" in script_lower: + return "py_get_skeleton" + return "other" + +#endregion: Operations + + #region MMA (Controller) + def _cb_plan_epic(self) -> None: """ [C: src/gui_2.py:App._render_mma_epic_planner, tests/test_mma_orchestration_gui.py:test_cb_plan_epic_launches_thread] @@ -4153,10 +4114,8 @@ class AppController: def kill_worker(self, worker_id: str) -> None: """ - - - Aborts a running worker. - [C: src/gui_2.py:App._cb_kill_ticket, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread] + Aborts a running worker. + [C: src/gui_2.py:App._cb_kill_ticket, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread] """ engine = self.engines.get(self.active_track.id if self.active_track else None) if engine: @@ -4176,23 +4135,6 @@ class AppController: if engine: engine.resume() - def inject_context(self, data: dict) -> None: - """ - - - Programmatic context injection. - [C: tests/test_headless_simulation.py:test_mma_track_lifecycle_simulation] - """ - file_path = data.get("file_path") - if file_path: - if not os.path.isabs(file_path): - file_path = os.path.relpath(file_path, self.active_project_root) - existing = next((f for f in self.files if (f.path if hasattr(f, "path") else str(f)) == file_path), None) - if not existing: - item = models.FileItem(path=file_path) - self.files.append(item) - self._refresh_from_project() - def approve_ticket(self, ticket_id: str) -> None: """Manually approves a ticket for execution.""" engine = self.engines.get(self.active_track.id if self.active_track else None) @@ -4285,54 +4227,121 @@ class AppController: # Refresh tracks from disk self.tracks = project_manager.get_all_tracks(self.active_project_root) - def _push_mma_state_update(self) -> None: + def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None: """ - [C: src/gui_2.py:App._cb_block_ticket, src/gui_2.py:App._cb_unblock_ticket, src/gui_2.py:App._render_mma_ticket_editor, src/gui_2.py:App._render_task_dag_panel, src/gui_2.py:App._render_ticket_queue, src/gui_2.py:App._reorder_ticket, src/gui_2.py:App.bulk_block, src/gui_2.py:App.bulk_execute, src/gui_2.py:App.bulk_skip, tests/test_gui_phase4.py:test_push_mma_state_update] + [C: src/gui_2.py:App._handle_approve_mma_step, src/gui_2.py:App._handle_approve_spawn, src/gui_2.py:App._render_mma_modals] """ - if not self.active_track: - return - # Sync active_tickets (list of dicts) back to active_track.tickets (list of models.Ticket objects) - self.active_track.tickets = [models.Ticket.from_dict(t) for t in self.active_tickets] - # Save the state to disk - existing = project_manager.load_track_state(self.active_track.id, self.active_project_root) - meta = models.Metadata( - id=self.active_track.id, - name=self.active_track.description, - status=self.mma_status, - created_at=existing.metadata.created_at if existing else datetime.now(), - updated_at=datetime.now() - ) - state = models.TrackState( - metadata=meta, - discussion=existing.discussion if existing else [], - tasks=self.active_track.tickets - ) - project_manager.save_track_state(self.active_track.id, state, self.active_project_root) + if self._pending_mma_approvals: + task = self._pending_mma_approvals.pop(0) + dlg = task.get("dialog_container", [None])[0] + if dlg: + with dlg._condition: + dlg._approved = approved + if payload is not None: + dlg._payload = payload + dlg._done = True + dlg._condition.notify_all() + elif self._pending_mma_spawns: + task = self._pending_mma_spawns.pop(0) + spawn_dlg = task.get("dialog_container", [None])[0] + if spawn_dlg: + with spawn_dlg._condition: + spawn_dlg._approved = approved + spawn_dlg._abort = abort + if prompt is not None: + spawn_dlg._prompt = prompt + if context_md is not None: + spawn_dlg._context_md = context_md + spawn_dlg._done = True + spawn_dlg._condition.notify_all() - def _load_active_tickets(self) -> None: + def _cb_load_track(self, track_id: str) -> None: """ - - - Populates self.active_tickets based on the current execution mode. - [C: tests/test_gui_dag_beads.py:test_load_active_tickets_from_beads] + [C: src/gui_2.py:App._render_mma_track_browser] """ - if getattr(self, "ui_project_execution_mode", "native") == "beads": - from src import beads_client - bclient = beads_client.BeadsClient(Path(self.active_project_root)) - beads = bclient.list_beads() - self.active_tickets = [] - for b in beads: - self.active_tickets.append({ - "id": b.id, - "title": b.title, - "description": b.description, - "status": b.status, - "assigned_to": "tier3-worker", - "target_file": "", - "depends_on": [] - }) - else: - if self.active_track: - self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in self.active_track.tickets] - else: - self.active_tickets = [] \ No newline at end of file + state = project_manager.load_track_state(track_id, self.active_project_root) + if state: + try: + # Convert list[Ticket] or list[dict] to list[Ticket] for Track object + tickets = [] + for t in state.tasks: + if isinstance(t, dict): + tickets.append(models.Ticket(**t)) + else: + tickets.append(t) + self.active_track = models.Track( + id=state.metadata.id, + description=state.metadata.name, + tickets=tickets + ) + # Keep dicts for UI table + self._load_active_tickets() + # Load track-scoped history + history = project_manager.load_track_history(track_id, self.active_project_root) + with self._disc_entries_lock: + if history: + self.disc_entries[:] = models.parse_history_entries(history, self.disc_roles) + else: + self.disc_entries.clear() + self._recalculate_session_usage() + self.ai_status = f"Loaded track: {state.metadata.name}" + except Exception as e: + self.ai_status = f"Load track error: {e}" + print(f"Error loading track {track_id}: {e}") + + #endregion: MMA (Controller) + +#region: MMA + +class MMAApprovalDialog: + def __init__(self, ticket_id: str, payload: str) -> None: + """ + [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] + """ + self._payload = payload + self._condition = threading.Condition() + self._done = False + self._approved = False + + def wait(self) -> tuple[bool, str]: + """ + [C: src/mcp_client.py:StdioMCPServer.stop, src/multi_agent_conductor.py:confirm_execution, src/multi_agent_conductor.py:confirm_spawn, tests/conftest.py:live_gui, tests/test_ai_client_concurrency.py:run_t1, tests/test_ai_client_concurrency.py:run_t2, tests/test_ai_server.py:test_server_handles_list_models, tests/test_ai_server.py:test_server_handles_unknown_method, tests/test_ai_server.py:test_server_loads_google_genai_quickly, tests/test_ai_server.py:test_server_outputs_ready_marker, tests/test_ai_server.py:test_server_starts_and_exits_cleanly, tests/test_conductor_engine_abort.py:worker, tests/test_parallel_execution.py:test_worker_pool_limit] + """ + start_time = time.time() + with self._condition: + while not self._done: + if time.time() - start_time > 120: + return False, self._payload + self._condition.wait(timeout=0.1) + return self._approved, self._payload + +class MMASpawnApprovalDialog: + def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None: + """ + [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] + """ + self._prompt = prompt + self._context_md = context_md + self._condition = threading.Condition() + self._done = False + self._approved = False + self._abort = False + + def wait(self) -> dict[str, Any]: + """ + [C: src/mcp_client.py:StdioMCPServer.stop, src/multi_agent_conductor.py:confirm_execution, src/multi_agent_conductor.py:confirm_spawn, tests/conftest.py:live_gui, tests/test_ai_client_concurrency.py:run_t1, tests/test_ai_client_concurrency.py:run_t2, tests/test_ai_server.py:test_server_handles_list_models, tests/test_ai_server.py:test_server_handles_unknown_method, tests/test_ai_server.py:test_server_loads_google_genai_quickly, tests/test_ai_server.py:test_server_outputs_ready_marker, tests/test_ai_server.py:test_server_starts_and_exits_cleanly, tests/test_conductor_engine_abort.py:worker, tests/test_parallel_execution.py:test_worker_pool_limit] + """ + start_time = time.time() + with self._condition: + while not self._done: + if time.time() - start_time > 120: + return {'approved': False, 'abort': True, 'prompt': self._prompt, 'context_md': self._context_md} + self._condition.wait(timeout=0.1) + return { + 'approved': self._approved, + 'abort': self._abort, + 'prompt': self._prompt, + 'context_md': self._context_md + } + +#endregion: MMA diff --git a/src/gui_2.py b/src/gui_2.py index 1adf6454..cfd45aa8 100644 --- a/src/gui_2.py +++ b/src/gui_2.py @@ -60,7 +60,14 @@ class _LazyModule: if self._attr_name is None: self._cached = mod else: - self._cached = getattr(mod, self._attr_name) + try: + self._cached = getattr(mod, self._attr_name) + except AttributeError: + sub_mod_name = f"{self._module_name}.{self._attr_name}" + try: + self._cached = _importlib.import_module(sub_mod_name) + except (ImportError, ModuleNotFoundError): + self._cached = _FiledialogStub() return self._cached def __getattr__(self, name: str) -> _Any: @@ -69,6 +76,22 @@ class _LazyModule: def __call__(self, *args: _Any, **kwargs: _Any) -> _Any: return self._resolve()(*args, **kwargs) +class _FiledialogStub: + """No-op replacement for tkinter.filedialog on Python installs where + the Tcl/Tk runtime is missing (e.g. embedded Python, slim Docker images). + All dialog functions return safe empty sentinels so call sites that do + `if p and p not in app.x: app.x.append(p)` treat a missing dialog as a + no-op. Exposes a `available` flag so the UI can detect the stub and + offer an ImGui-based path input as an alternative. + [C: src/gui_2.py:_LazyModule._resolve] + """ + available: bool = False + def askopenfilename(self, *args: _Any, **kwargs: _Any) -> str: return "" + def askopenfilenames(self, *args: _Any, **kwargs: _Any) -> tuple: return () + def askdirectory(self, *args: _Any, **kwargs: _Any) -> str: return "" + def asksaveasfilename(self, *args: _Any, **kwargs: _Any) -> str: return "" + + # Heavy modules that were previously top-level imports (now lazy): np = _LazyModule("numpy") # was: import numpy as np filedialog = _LazyModule("tkinter", "filedialog") # was: from tkinter import filedialog diff --git a/src/io_pool.py b/src/io_pool.py index 51c56d15..f05395c1 100644 --- a/src/io_pool.py +++ b/src/io_pool.py @@ -1,3 +1,19 @@ +"""Shared AppController I/O pool factory. + +Historical note: an earlier revision of this module registered an +``atexit.register(pool.shutdown, wait=False)`` handler here, mirroring +the conftest fix at commit 8957c9a5. That approach was reverted because +it does not solve the Ctrl+C hang in ``sloppy.py`` when a worker is +mid-task (e.g. a long-running Gemini/Anthropic HTTP request): atexit +handlers do not fire at all in that scenario, so the process still hangs +in ``ThreadPoolExecutor.__del__`` -> ``shutdown(wait=True)`` during +finalization. + +The production fix lives in ``AppController.__init__`` as a SIGINT +handler that drains the pool and calls ``os._exit(0)``, sidestepping +the broken finalization chain. See commit log for details. +""" + from concurrent.futures import ThreadPoolExecutor diff --git a/src/mcp_client.py b/src/mcp_client.py index f68f5224..dfbad2af 100644 --- a/src/mcp_client.py +++ b/src/mcp_client.py @@ -134,16 +134,14 @@ def configure(file_items: list[dict[str, Any]], extra_base_dirs: list[str] | Non def _is_allowed(path: Path) -> bool: """ - + Return True if `path` is within the allowlist. + A path is allowed if: + - it is explicitly in _allowed_paths, OR + - it is contained within (or equal to) one of the _base_dirs + All paths are resolved (follows symlinks) before comparison to prevent + symlink-based path traversal. - Return True if `path` is within the allowlist. - A path is allowed if: - - it is explicitly in _allowed_paths, OR - - it is contained within (or equal to) one of the _base_dirs - All paths are resolved (follows symlinks) before comparison to prevent - symlink-based path traversal. - - CRITICAL: Blacklisted files (history) are NEVER allowed. + CRITICAL: Blacklisted files (history) are NEVER allowed. [C: tests/test_arch_boundary_phase1.py:TestArchBoundaryPhase1.test_mcp_client_whitelist_enforcement, tests/test_history_management.py:test_mcp_blacklist] """ from src.paths import get_config_path, get_credentials_path @@ -181,10 +179,8 @@ def _is_allowed(path: Path) -> bool: def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]: """ - - - Resolve raw_path and verify it passes the allowlist check. - Returns (resolved_path, error_string). error_string is empty on success. + Resolve raw_path and verify it passes the allowlist check. + Returns (resolved_path, error_string). error_string is empty on success. """ try: p = Path(raw_path) @@ -202,49 +198,10 @@ def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]: return p, "" # ------------------------------------------------------------------ tool implementations -def read_file(path: str) -> str: - """Return the UTF-8 content of a file, or an error string.""" - p, err = _resolve_and_check(path) - if err or p is None: - return err - if not p.exists(): return f"ERROR: file not found: {path}" - if not p.is_file(): return f"ERROR: not a file: {path}" - try: - return p.read_text(encoding="utf-8") - except Exception as e: - return f"ERROR reading '{path}': {e}" - -def list_directory(path: str) -> str: - """List entries in a directory. Returns a compact text table.""" - p, err = _resolve_and_check(path) - if err or p is None: - return err - if not p.exists(): return f"ERROR: path not found: {path}" - if not p.is_dir(): return f"ERROR: not a directory: {path}" - try: - entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) - lines = [f"Directory: {p}", ""] - count = 0 - for entry in entries: - # Blacklist check - name = entry.name.lower() - if name == "history.toml" or name.endswith("_history.toml"): - continue - kind = "file" if entry.is_file() else "dir " - size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else "" - lines.append(f" [{kind}] {entry.name:<40} {size}") - count += 1 - lines.append(f" ({count} entries)") - return "\n".join(lines) - except Exception as e: - return f"ERROR listing '{path}': {e}" - def search_files(path: str, pattern: str) -> str: """ - - - Search for files matching a glob pattern within path. - pattern examples: '*.py', '**/*.toml', 'src/**/*.rs' + Search for files matching a glob pattern within path. + pattern examples: '*.py', '**/*.toml', 'src/**/*.rs' """ p, err = _resolve_and_check(path) if err or p is None: @@ -271,13 +228,79 @@ def search_files(path: str, pattern: str) -> str: except Exception as e: return f"ERROR searching '{path}': {e}" +def list_directory(path: str) -> str: + """List entries in a directory. Returns a compact text table.""" + p, err = _resolve_and_check(path) + if err or p is None: + return err + if not p.exists(): return f"ERROR: path not found: {path}" + if not p.is_dir(): return f"ERROR: not a directory: {path}" + try: + entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) + lines = [f"Directory: {p}", ""] + count = 0 + for entry in entries: + # Blacklist check + name = entry.name.lower() + if name == "history.toml" or name.endswith("_history.toml"): + continue + kind = "file" if entry.is_file() else "dir " + size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else "" + lines.append(f" [{kind}] {entry.name:<40} {size}") + count += 1 + lines.append(f" ({count} entries)") + return "\n".join(lines) + except Exception as e: + return f"ERROR listing '{path}': {e}" + +def read_file(path: str) -> str: + """Return the UTF-8 content of a file, or an error string.""" + p, err = _resolve_and_check(path) + if err or p is None: + return err + if not p.exists(): return f"ERROR: file not found: {path}" + if not p.is_file(): return f"ERROR: not a file: {path}" + try: + return p.read_text(encoding="utf-8") + except Exception as e: + return f"ERROR reading '{path}': {e}" + +def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str: + """ + Replace exact string match in a file. Preserves indentation and line endings. + Drop-in replacement for native edit tool that destroys 1-space indentation. + """ + p, err = _resolve_and_check(path) + if err: + return err + assert p is not None + if not p.exists(): + return f"ERROR: file not found: {path}" + if not old_string: + return "ERROR: old_string cannot be empty" + try: + content = p.read_text(encoding="utf-8") + if old_string not in content: + return f"ERROR: old_string not found in '{path}'" + count = content.count(old_string) + if count > 1 and not replace_all: + return f"ERROR: Found {count} matches for old_string in '{path}'. Use replace_all=true or provide more context to make it unique." + if replace_all: + new_content = content.replace(old_string, new_string) + p.write_text(new_content, encoding="utf-8") + return f"Successfully replaced {count} occurrences in '{path}'" + else: + new_content = content.replace(old_string, new_string, 1) + p.write_text(new_content, encoding="utf-8") + return f"Successfully replaced 1 occurrence in '{path}'" + except Exception as e: + return f"ERROR editing '{path}': {e}" + def get_file_summary(path: str) -> str: """ - - - Return the heuristic summary for a file (same as the initial context block). - For .py files: imports, classes, methods, functions, constants. - For .toml: table keys. For .md: headings. Others: line count + preview. + Return the heuristic summary for a file (same as the initial context block). + For .py files: imports, classes, methods, functions, constants. + For .toml: table keys. For .md: headings. Others: line count + preview. """ p, err = _resolve_and_check(path) if err or p is None: @@ -292,220 +315,6 @@ def get_file_summary(path: str) -> str: except Exception as e: return f"ERROR summarising '{path}': {e}" -def py_get_skeleton(path: str) -> str: - """ - - - Returns a skeleton of a Python file (preserving docstrings, stripping function bodies). - """ - p, err = _resolve_and_check(path) - if err: - return err - assert p is not None - if not p.exists(): - return f"ERROR: file not found: {path}" - if not p.is_file() or p.suffix != ".py": - return f"ERROR: not a python file: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("python") - return parser.get_skeleton(code) - except Exception as e: - return f"ERROR generating skeleton for '{path}': {e}" - -def ts_c_get_skeleton(path: str) -> str: - """ - - Returns a skeleton of a C file. - [C: tests/test_ts_c_tools.py:test_ts_c_get_skeleton] - """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - return parser.get_skeleton(code, path=str(p)) - except Exception as e: - return f"ERROR generating skeleton for '{path}': {e}" - -def ts_cpp_get_skeleton(path: str) -> str: - """ - - Returns a skeleton of a C++ file. - [C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_skeleton] - """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("cpp") - return parser.get_skeleton(code, path=str(p)) - except Exception as e: - return f"ERROR generating skeleton for '{path}': {e}" - -def py_get_code_outline(path: str) -> str: - """ - - - Returns a hierarchical outline of a code file (classes, functions, methods with line ranges). - """ - p, err = _resolve_and_check(path) - if err: - return err - assert p is not None - if not p.exists(): - return f"ERROR: file not found: {path}" - if not p.is_file(): - return f"ERROR: not a file: {path}" - try: - code = p.read_text(encoding="utf-8") - return outline_tool.get_outline(p, code) - except Exception as e: - return f"ERROR generating outline for '{path}': {e}" - -def ts_c_get_code_outline(path: str) -> str: - """ - - Returns a hierarchical outline of a C file. - [C: tests/test_ts_c_tools.py:test_ts_c_get_code_outline] - """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - return parser.get_code_outline(code, path=str(p)) - except Exception as e: - return f"ERROR generating outline for '{path}': {e}" - -def ts_cpp_get_code_outline(path: str) -> str: - """ - - Returns a hierarchical outline of a C++ file. - [C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_code_outline] - """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("cpp") - return parser.get_code_outline(code, path=str(p)) - except Exception as e: - return f"ERROR generating outline for '{path}': {e}" - -def ts_c_get_definition(path: str, name: str) -> str: - """Returns the source code for a specific definition in a C file.""" - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - return parser.get_definition(code, name, path=str(p)) - except Exception as e: - return f"ERROR retrieving definition '{name}' from '{path}': {e}" - -def ts_cpp_get_definition(path: str, name: str) -> str: - """ - - Returns the source code for a specific definition in a C++ file. - [C: tests/test_ast_masking_core.py:test_ast_masking_gencpp_samples, tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition_gencpp] - """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("cpp") - return parser.get_definition(code, name, path=str(p)) - except Exception as e: - return f"ERROR retrieving definition '{name}' from '{path}': {e}" - -def ts_c_get_signature(path: str, name: str) -> str: - """Returns the signature part of a function in a C file.""" - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - return parser.get_signature(code, name, path=str(p)) - except Exception as e: - return f"ERROR retrieving signature '{name}' from '{path}': {e}" - -def ts_cpp_get_signature(path: str, name: str) -> str: - """Returns the signature part of a function or method in a C++ file.""" - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("cpp") - return parser.get_signature(code, name, path=str(p)) - except Exception as e: - return f"ERROR retrieving signature '{name}' from '{path}': {e}" - -def ts_c_update_definition(path: str, name: str, new_content: str) -> str: - """Surgically replace the definition of a function in a C file.""" - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - updated_code = parser.update_definition(code, name, new_content, path=str(p)) - if updated_code.startswith("ERROR:"): - return updated_code - p.write_text(updated_code, encoding="utf-8") - return f"Successfully updated definition '{name}' in {path}" - except Exception as e: - return f"ERROR updating definition '{name}' in '{path}': {e}" - -def ts_cpp_update_definition(path: str, name: str, new_content: str) -> str: - """ - - Surgically replace the definition of a class or function in a C++ file. - [C: tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition_gencpp] - """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("cpp") - updated_code = parser.update_definition(code, name, new_content, path=str(p)) - if updated_code.startswith("ERROR:"): - return updated_code - p.write_text(updated_code, encoding="utf-8") - return f"Successfully updated definition '{name}' in {path}" - except Exception as e: - return f"ERROR updating definition '{name}' in '{path}': {e}" - def get_file_slice(path: str, start_line: int, end_line: int) -> str: """Return a specific line range from a file.""" p, err = _resolve_and_check(path) @@ -543,39 +352,187 @@ def set_file_slice(path: str, start_line: int, end_line: int, new_content: str) except Exception as e: return f"ERROR updating slice in '{path}': {e}" -def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str: +def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str: """ - - - Replace exact string match in a file. Preserves indentation and line endings. - Drop-in replacement for native edit tool that destroys 1-space indentation. + Returns the git diff for a file or directory. + base_rev: The base revision (default: HEAD) + head_rev: The head revision (optional) """ p, err = _resolve_and_check(path) if err: return err assert p is not None - if not p.exists(): - return f"ERROR: file not found: {path}" - if not old_string: - return "ERROR: old_string cannot be empty" + cmd = ["git", "diff", base_rev] + if head_rev: + cmd.append(head_rev) + cmd.extend(["--", str(p)]) try: - content = p.read_text(encoding="utf-8") - if old_string not in content: - return f"ERROR: old_string not found in '{path}'" - count = content.count(old_string) - if count > 1 and not replace_all: - return f"ERROR: Found {count} matches for old_string in '{path}'. Use replace_all=true or provide more context to make it unique." - if replace_all: - new_content = content.replace(old_string, new_string) - p.write_text(new_content, encoding="utf-8") - return f"Successfully replaced {count} occurrences in '{path}'" - else: - new_content = content.replace(old_string, new_string, 1) - p.write_text(new_content, encoding="utf-8") - return f"Successfully replaced 1 occurrence in '{path}'" + result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding="utf-8") + return result.stdout if result.stdout else "(no changes)" + except subprocess.CalledProcessError as e: + return f"ERROR running git diff: {e.stderr}" except Exception as e: - return f"ERROR editing '{path}': {e}" + return f"ERROR: {e}" + +#region: C +def ts_c_get_code_outline(path: str) -> str: + """ + Returns a hierarchical outline of a C file. + [C: tests/test_ts_c_tools.py:test_ts_c_get_code_outline] + """ + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.exists(): return f"ERROR: file not found: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("c") + return parser.get_code_outline(code, path=str(p)) + except Exception as e: + return f"ERROR generating outline for '{path}': {e}" + +def ts_c_get_definition(path: str, name: str) -> str: + """Returns the source code for a specific definition in a C file.""" + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.exists(): return f"ERROR: file not found: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("c") + return parser.get_definition(code, name, path=str(p)) + except Exception as e: + return f"ERROR retrieving definition '{name}' from '{path}': {e}" + +def ts_c_get_signature(path: str, name: str) -> str: + """Returns the signature part of a function in a C file.""" + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.exists(): return f"ERROR: file not found: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("c") + return parser.get_signature(code, name, path=str(p)) + except Exception as e: + return f"ERROR retrieving signature '{name}' from '{path}': {e}" + +def ts_c_update_definition(path: str, name: str, new_content: str) -> str: + """Surgically replace the definition of a function in a C file.""" + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.exists(): return f"ERROR: file not found: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("c") + updated_code = parser.update_definition(code, name, new_content, path=str(p)) + if updated_code.startswith("ERROR:"): + return updated_code + p.write_text(updated_code, encoding="utf-8") + return f"Successfully updated definition '{name}' in {path}" + except Exception as e: + return f"ERROR updating definition '{name}' in '{path}': {e}" + +#endregion: C + +#region: C++ + +def ts_cpp_get_skeleton(path: str) -> str: + """ + Returns a skeleton of a C++ file. + [C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_skeleton] + """ + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.exists(): return f"ERROR: file not found: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("cpp") + return parser.get_skeleton(code, path=str(p)) + except Exception as e: + return f"ERROR generating skeleton for '{path}': {e}" + +def ts_cpp_get_code_outline(path: str) -> str: + """ + Returns a hierarchical outline of a C++ file. + [C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_code_outline] + """ + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.exists(): return f"ERROR: file not found: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("cpp") + return parser.get_code_outline(code, path=str(p)) + except Exception as e: + return f"ERROR generating outline for '{path}': {e}" + +def ts_cpp_get_definition(path: str, name: str) -> str: + """ + Returns the source code for a specific definition in a C++ file. + [C: tests/test_ast_masking_core.py:test_ast_masking_gencpp_samples, tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition_gencpp] + """ + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.exists(): return f"ERROR: file not found: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("cpp") + return parser.get_definition(code, name, path=str(p)) + except Exception as e: + return f"ERROR retrieving definition '{name}' from '{path}': {e}" + +def ts_cpp_get_signature(path: str, name: str) -> str: + """Returns the signature part of a function or method in a C++ file.""" + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.exists(): return f"ERROR: file not found: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("cpp") + return parser.get_signature(code, name, path=str(p)) + except Exception as e: + return f"ERROR retrieving signature '{name}' from '{path}': {e}" + +def ts_cpp_update_definition(path: str, name: str, new_content: str) -> str: + """ + Surgically replace the definition of a class or function in a C++ file. + [C: tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition, tests/test_ts_cpp_tools.py:test_ts_cpp_update_definition_gencpp] + """ + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.exists(): return f"ERROR: file not found: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("cpp") + updated_code = parser.update_definition(code, name, new_content, path=str(p)) + if updated_code.startswith("ERROR:"): + return updated_code + p.write_text(updated_code, encoding="utf-8") + return f"Successfully updated definition '{name}' in {path}" + except Exception as e: + return f"ERROR updating definition '{name}' in '{path}': {e}" + +#endregion: C++ + +#region: Python AST + def _get_symbol_node(tree: ast.AST, name: str) -> Optional[ast.AST]: """Helper to find an AST node by name (Class, Function, or Variable). Supports dot notation.""" parts = name.split(".") @@ -602,12 +559,48 @@ def _get_symbol_node(tree: ast.AST, name: str) -> Optional[ast.AST]: current = found return current +def py_get_skeleton(path: str) -> str: + """ + Returns a skeleton of a Python file (preserving docstrings, stripping function bodies). + """ + p, err = _resolve_and_check(path) + if err: + return err + assert p is not None + if not p.exists(): + return f"ERROR: file not found: {path}" + if not p.is_file() or p.suffix != ".py": + return f"ERROR: not a python file: {path}" + try: + from src.file_cache import ASTParser + code = p.read_text(encoding="utf-8") + parser = ASTParser("python") + return parser.get_skeleton(code) + except Exception as e: + return f"ERROR generating skeleton for '{path}': {e}" + +def py_get_code_outline(path: str) -> str: + """ + Returns a hierarchical outline of a code file (classes, functions, methods with line ranges). + """ + p, err = _resolve_and_check(path) + if err: + return err + assert p is not None + if not p.exists(): + return f"ERROR: file not found: {path}" + if not p.is_file(): + return f"ERROR: not a file: {path}" + try: + code = p.read_text(encoding="utf-8") + return outline_tool.get_outline(p, code) + except Exception as e: + return f"ERROR generating outline for '{path}': {e}" + def py_get_symbol_info(path: str, name: str) -> tuple[str, int] | str: """ - - - Returns (source_code, line_number) for a specific class, function, or method definition. - If not found, returns an error string. +Returns (source_code, line_number) for a specific class, function, or method definition. +If not found, returns an error string. """ p, err = _resolve_and_check(path) if err: @@ -632,11 +625,9 @@ def py_get_symbol_info(path: str, name: str) -> tuple[str, int] | str: def py_get_definition(path: str, name: str) -> str: """ - - - Returns the source code for a specific class, function, or method definition. - path: Path to the code file. - name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method'). + Returns the source code for a specific class, function, or method definition. + path: Path to the code file. + name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method'). """ p, err = _resolve_and_check(path) if err: @@ -806,30 +797,6 @@ def py_set_var_declaration(path: str, name: str, new_declaration: str) -> str: except Exception as e: return f"ERROR updating variable '{name}' in '{path}': {e}" -def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str: - """ - - - Returns the git diff for a file or directory. - base_rev: The base revision (default: HEAD) - head_rev: The head revision (optional) - """ - p, err = _resolve_and_check(path) - if err: - return err - assert p is not None - cmd = ["git", "diff", base_rev] - if head_rev: - cmd.append(head_rev) - cmd.extend(["--", str(p)]) - try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding="utf-8") - return result.stdout if result.stdout else "(no changes)" - except subprocess.CalledProcessError as e: - return f"ERROR running git diff: {e.stderr}" - except Exception as e: - return f"ERROR: {e}" - def py_find_usages(path: str, name: str) -> str: """Finds exact string matches of a symbol in a given file or directory.""" p, err = _resolve_and_check(path) @@ -964,38 +931,6 @@ def py_get_docstring(path: str, name: str) -> str: except Exception as e: return f"ERROR getting docstring for '{name}': {e}" -def get_tree(path: str, max_depth: int = 2) -> str: - """Returns a directory structure up to a max depth.""" - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.is_dir(): return f"ERROR: not a directory: {path}" - try: - m_depth = max_depth - - def _build_tree(dir_path: Path, current_depth: int, prefix: str = "") -> list[str]: - if current_depth > m_depth: return [] - lines = [] - try: - entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) - except PermissionError: - return [] - # Filter - entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")] - for i, entry in enumerate(entries): - is_last = (i == len(entries) - 1) - connector = "└── " if is_last else "├── " - lines.append(f"{prefix}{connector}{entry.name}") - if entry.is_dir(): - extension = " " if is_last else "│ " - lines.extend(_build_tree(entry, current_depth + 1, prefix + extension)) - return lines - tree_lines = [f"{p.name}/"] + _build_tree(p, 1) - return "\n".join(tree_lines) - except Exception as e: - return f"ERROR generating tree for '{path}': {e}" - # ------------------------------------------------------------------ web tools - def derive_code_path(target: str, max_depth: int = 5) -> str: """Recursively traces the execution path of a specific function or method.""" from src.file_cache import ASTParser @@ -1056,6 +991,42 @@ def derive_code_path(target: str, max_depth: int = 5) -> str: trace(symbol_name, found_path, found_code, 0, "") return "\n".join(output) +#endregion Python AST + +def get_tree(path: str, max_depth: int = 2) -> str: + """Returns a directory structure up to a max depth.""" + p, err = _resolve_and_check(path) + if err: return err + assert p is not None + if not p.is_dir(): return f"ERROR: not a directory: {path}" + try: + m_depth = max_depth + + def _build_tree(dir_path: Path, current_depth: int, prefix: str = "") -> list[str]: + if current_depth > m_depth: return [] + lines = [] + try: + entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) + except PermissionError: + return [] + # Filter + entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")] + for i, entry in enumerate(entries): + is_last = (i == len(entries) - 1) + connector = "└── " if is_last else "├── " + lines.append(f"{prefix}{connector}{entry.name}") + if entry.is_dir(): + extension = " " if is_last else "│ " + lines.extend(_build_tree(entry, current_depth + 1, prefix + extension)) + return lines + tree_lines = [f"{p.name}/"] + _build_tree(p, 1) + return "\n".join(tree_lines) + except Exception as e: + return f"ERROR generating tree for '{path}': {e}" + # ------------------------------------------------------------------ web tools + +#region: Web + class _DDGParser(HTMLParser): def __init__(self) -> None: super().__init__() @@ -1161,12 +1132,13 @@ def fetch_url(url: str) -> str: return full_text except Exception as e: return f"ERROR fetching URL '{url}': {e}" + +#endregion: Web def get_ui_performance() -> str: """ - - Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag). - [C: tests/test_mcp_perf_tool.py:test_mcp_perf_tool_retrieval] + Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag). + [C: tests/test_mcp_perf_tool.py:test_mcp_perf_tool_retrieval] """ if perf_monitor_callback is None: return "INFO: UI Performance monitor is not available (headless/CLI mode). This tool is only functional when the Manual Slop GUI is running." @@ -1276,7 +1248,6 @@ class ExternalMCPManager: async def add_server(self, config: models.MCPServerConfig): """ - Add and start a new MCP server from a configuration object. [C: tests/test_external_mcp.py:test_external_mcp_real_process, tests/test_external_mcp.py:test_get_tool_schemas_includes_external] """ @@ -1289,7 +1260,6 @@ class ExternalMCPManager: async def stop_all(self): """ - Stop all managed MCP servers and clear the registry. [C: tests/test_external_mcp.py:test_external_mcp_real_process, tests/test_external_mcp.py:test_get_tool_schemas_includes_external, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call] """ @@ -1299,7 +1269,6 @@ class ExternalMCPManager: def get_all_tools(self) -> dict: """ - Retrieve a dictionary of all tools available across all managed servers. [C: tests/test_external_mcp.py:test_external_mcp_real_process, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call] """ @@ -1315,7 +1284,6 @@ class ExternalMCPManager: async def async_dispatch(self, tool_name: str, tool_input: dict) -> str: """ - Dispatch a tool call to the appropriate external MCP server asynchronously. [C: src/rag_engine.py:RAGEngine._async_search_mcp, tests/test_external_mcp.py:test_external_mcp_real_process] """ @@ -1328,7 +1296,6 @@ _external_mcp_manager = ExternalMCPManager() def get_external_mcp_manager() -> ExternalMCPManager: """ - Retrieve the global ExternalMCPManager instance. [C: tests/test_external_mcp.py:test_get_tool_schemas_includes_external, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call] """ @@ -1508,8 +1475,6 @@ async def async_dispatch(tool_name: str, tool_input: dict[str, Any]) -> str: return f'ERROR: unknown MCP tool {tool_name}' - - def get_tool_schemas() -> list[dict[str, Any]]: """ [C: tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_mcp_client_dispatch_completeness, tests/test_external_mcp.py:test_get_tool_schemas_includes_external, tests/test_mcp_client_beads.py:test_bd_mcp_tools] @@ -2303,4 +2268,4 @@ MCP_TOOL_SPECS: list[dict[str, Any]] = [ } ] -TOOL_NAMES: set[str] = {t['name'] for t in MCP_TOOL_SPECS} \ No newline at end of file +TOOL_NAMES: set[str] = {t['name'] for t in MCP_TOOL_SPECS} diff --git a/tests/test_app_controller_sigint.py b/tests/test_app_controller_sigint.py new file mode 100644 index 00000000..60b31907 --- /dev/null +++ b/tests/test_app_controller_sigint.py @@ -0,0 +1,133 @@ +"""Regression tests for the Ctrl+C hang fix in AppController. + +The bug: when a worker of the AppController's I/O pool is mid-task in +user code (e.g. a long-running Gemini/Anthropic HTTP request) and the +user presses Ctrl+C in the terminal, the Python interpreter hangs +forever during finalization. The hang chain is: + 1. SIGINT is delivered to the main thread + 2. Python's default handler would raise KeyboardInterrupt + 3. The exception propagates out of main() + 4. Interpreter finalization begins + 5. ThreadPoolExecutor.__del__ runs and calls shutdown(wait=True) + 6. shutdown(wait=True) joins each worker thread + 7. The blocked worker never returns -> hang + +atexit handlers do NOT fire in this scenario (verified empirically — +see src/io_pool.py module docstring), so a pool-creation atexit +handler cannot fix it. The fix is a SIGINT handler installed by +AppController.__init__ that drains the pool non-blockingly and calls +os._exit(0), bypassing the broken finalization chain. + +These tests verify both the install (unit) and the full signal flow +(subprocess) paths. +""" + +import signal +import subprocess +import sys +import textwrap +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Any + +import pytest + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from src.app_controller import _install_sigint_exit_handler # noqa: E402 + + +@pytest.fixture +def restore_sigint(): + """Snapshot and restore SIGINT handler around each test.""" + original = signal.getsignal(signal.SIGINT) + yield + signal.signal(signal.SIGINT, original) + + +class _FakeController: + """Minimal stand-in for AppController: just exposes _io_pool.""" + + def __init__(self) -> None: + self._io_pool = ThreadPoolExecutor( + max_workers=2, thread_name_prefix="fake-ctrl" + ) + + +def test_install_sigint_handler_installs_callable(restore_sigint: Any) -> None: + """Unit: helper installs a callable SIGINT handler on the main thread. + + The conftest warmup AppController already installed a SIGINT handler at + pytest import time, so we cannot assert against SIG_DFL. We verify the + helper replaces whatever was there with a fresh callable from + ``_install_sigint_exit_handler`` (distinct identity check). + """ + ctrl = _FakeController() + try: + before = signal.getsignal(signal.SIGINT) + _install_sigint_exit_handler(ctrl) + after = signal.getsignal(signal.SIGINT) + assert callable(after), f"expected callable handler, got {after!r}" + assert after is not before, "helper did not replace the existing SIGINT handler" + finally: + ctrl._io_pool.shutdown(wait=False) + + +def test_sigint_subprocess_drains_blocked_pool() -> None: + """Subprocess: handler behavior — drain + os._exit(0) exits within 2s. + + Spawns a Python subprocess that mirrors the production pattern: a + ThreadPoolExecutor with a blocked worker, a SIGINT handler that calls + shutdown(wait=False) + os._exit(0). Invokes the handler directly + (bypassing OS signal delivery — which is flaky for CTRL_C_EVENT to a + python subprocess started with ``-c`` on Windows). Asserts the + subprocess exits within 2 seconds. If the handler were missing the + subprocess would hang until the test runner kills it. + + The OS signal-delivery path is verified by the unit test + (``test_install_sigint_handler_installs_callable``) and by manual + end-to-end testing (Ctrl+C in the terminal works because Python's + default SIGINT delivery is the same on all platforms). + """ + script = textwrap.dedent(''' + import signal + import threading + import os + from concurrent.futures import ThreadPoolExecutor + + pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="subproc-ctrl") + blocker = threading.Event() + pool.submit(blocker.wait) + + def _on_sigint(signum, frame): + try: pool.shutdown(wait=False) + except Exception: pass + os._exit(0) + + signal.signal(signal.SIGINT, _on_sigint) + print("ready", flush=True) + handler = signal.getsignal(signal.SIGINT) + handler(signal.SIGINT, None) + ''') + proc = subprocess.Popen( + [sys.executable, "-c", script], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + t0 = time.perf_counter() + try: + outs, errs = proc.communicate(timeout=2.0) + elapsed = time.perf_counter() - t0 + except subprocess.TimeoutExpired: + proc.kill() + proc.communicate(timeout=5.0) + pytest.fail("subprocess did not exit within 2s of handler invocation — drain + os._exit(0) is broken") + assert b"ready" in outs, f"subprocess did not reach handler install; stderr={errs!r}" + assert proc.returncode == 0, ( + f"subprocess exited with code {proc.returncode} (expected 0 from os._exit(0)); " + f"stderr={errs.decode(errors='replace')!r}" + ) + assert elapsed < 2.0, f"subprocess took {elapsed:.2f}s to exit (expected <2.0s)" diff --git a/tests/test_io_pool.py b/tests/test_io_pool.py index 225c48cd..dd06caf9 100644 --- a/tests/test_io_pool.py +++ b/tests/test_io_pool.py @@ -1,4 +1,13 @@ -"""Tests for src/io_pool.py (the shared 4-thread job pool on AppController).""" +"""Tests for src/io_pool.py (the shared 4-thread job pool on AppController). + +Historical note: an earlier revision of this file added two regression +tests asserting that ``make_io_pool`` registered an atexit shutdown +handler. Those tests were reverted together with the production atexit +fix they guarded, because the atexit approach does not solve the actual +Ctrl+C hang (see ``src/io_pool.py`` module docstring). The production +fix is a SIGINT handler in ``AppController.__init__``; the regression +test for that lives in ``tests/test_app_controller_sigint.py``. +""" import threading import time diff --git a/tests/test_lazymodule_filedialog_fallback.py b/tests/test_lazymodule_filedialog_fallback.py new file mode 100644 index 00000000..a85324b3 --- /dev/null +++ b/tests/test_lazymodule_filedialog_fallback.py @@ -0,0 +1,101 @@ +""" +Regression test for: AttributeError: module 'tkinter' has no attribute 'filedialog' + +On some Python installs (e.g., embedded distributions, or installs where +the Tcl/Tk runtime is missing), the `tkinter` package imports cleanly but +the `tkinter.filedialog` sub-module fails to load. The original `_LazyModule` +in src/gui_2.py used `getattr(tkinter, 'filedialog')` which raises a +confusing AttributeError at the call site. With 14 call sites in +render_projects_panel, render_workspace_settings_hub, render_fonts_panel, +and render_gemini_cli_settings, this AttributeError spammed the GUI's +stderr at 60fps whenever the Project Settings window was open. + +The fix must make `_LazyModule` fall back to a stub that mimics +`tkinter.filedialog`'s public API (askopenfilename, askdirectory, +asksaveasfilename, askopenfilenames) so the GUI does not crash. + +This test uses a deliberately-missing sub-module to exercise the fallback +path, making it deterministic across Python installs. +""" +import pytest +import importlib + +from src.gui_2 import _LazyModule + + +def test_lazymodule_falls_back_to_stub_on_attribute_error() -> None: + """ + Resolution must NOT raise AttributeError when the sub-module is + missing. Instead, _resolve() must return a stub that exposes the + public filedialog API. Before the fix, this test fails with + AttributeError: module 'os' has no attribute 'this_submodule_does_not_exist'. + """ + bad = _LazyModule("os", "this_submodule_does_not_exist") + resolved = bad._resolve() + assert resolved is not None + assert hasattr(resolved, "askopenfilename") + assert hasattr(resolved, "askdirectory") + assert hasattr(resolved, "asksaveasfilename") + assert hasattr(resolved, "askopenfilenames") + + +def test_lazymodule_stub_returns_empty_strings() -> None: + """ + The stub functions must return safe empty values: + - askopenfilename, askdirectory, asksaveasfilename: empty string "" + - askopenfilenames: empty tuple () + This ensures downstream code that does `if p and p not in app.x:` + or `if paths:` treats the missing-dialog as a no-op. + """ + bad = _LazyModule("os", "this_submodule_does_not_exist") + resolved = bad._resolve() + assert resolved.askopenfilename() == "" + assert resolved.askdirectory() == "" + assert resolved.asksaveasfilename() == "" + assert resolved.askopenfilenames() == () + + +def test_lazymodule_stub_ignores_kwargs() -> None: + """ + The stub must accept the same kwargs the real tkinter.filedialog + accepts (title, filetypes, defaultextension, initialdir) and return + the empty sentinel. This prevents TypeError if a call site passes + kwargs that the stub does not know about. + """ + bad = _LazyModule("os", "this_submodule_does_not_exist") + resolved = bad._resolve() + assert resolved.askopenfilename(title="x", filetypes=[("All", "*.*")]) == "" + assert resolved.askdirectory(title="y", initialdir="/") == "" + assert resolved.asksaveasfilename(title="z", defaultextension=".toml", filetypes=[("TOML", "*.toml")]) == "" + assert resolved.askopenfilenames(filetypes=[("Image", "*.png")]) == () + + +def test_lazymodule_real_filedialog_resolves_when_tkinter_works() -> None: + """ + On a working tkinter install (with Tcl/Tk runtime), the + `_LazyModule("tkinter", "filedialog")` instance must resolve to the + real tkinter.filedialog module. This is the smoke test: if tkinter + is healthy, the lazy import works as before. + """ + import tkinter as tk_root + try: + import tkinter.filedialog as real_filedialog + except (ImportError, AttributeError, tk_root.TclError): + pytest.skip("tkinter.filedialog not available in this Python install") + lazy = _LazyModule("tkinter", "filedialog") + resolved = lazy._resolve() + assert resolved is real_filedialog + + +def test_lazymodule_real_filedialog_does_not_raise_attribute_error() -> None: + """ + On a working tkinter install, calling .askopenfilename() through the + lazy module must not raise AttributeError. (Tests the call path + used by 14 call sites in render_projects_panel etc.) + """ + lazy = _LazyModule("tkinter", "filedialog") + resolved = lazy._resolve() + assert hasattr(resolved, "askopenfilename") + assert hasattr(resolved, "askdirectory") + assert hasattr(resolved, "asksaveasfilename") + assert hasattr(resolved, "askopenfilenames") diff --git a/tests/test_live_gui_filedialog_regression.py b/tests/test_live_gui_filedialog_regression.py new file mode 100644 index 00000000..42b64955 --- /dev/null +++ b/tests/test_live_gui_filedialog_regression.py @@ -0,0 +1,65 @@ +""" +Live-GUI smoke test for the tkinter.filedialog AttributeError regression. + +On Python installs where the Tcl/Tk runtime is missing, the lazy +`tkinter.filedialog` import raises AttributeError, which previously +crashed the Project Settings window and the Add Project button. +The unit-level test in `test_lazymodule_filedialog_fallback.py` +deterministically exercises the fallback path; this live test verifies +the same fix in the actual running app: opening the Project Settings +window via the Hook API must not produce an AttributeError, and the +app must remain responsive (proving no crash on attribute resolution). +""" +import time +from pathlib import Path +import pytest +from src.api_hook_client import ApiHookClient + + +def test_live_gui_project_settings_opens_without_filedialog_crash(live_gui) -> None: + """ + Regression: the Project Settings window's render call chain ends + in `render_projects_panel` → `filedialog.askopenfilename(...)` on + the "Add Project" click frame. Before the fix, every frame the + Project Settings window was open on a broken tkinter install would + log `AttributeError: module 'tkinter' has no attribute 'filedialog'`. + The fix in `_LazyModule._resolve()` falls back to a `_FiledialogStub` + that returns empty strings. + + This test: + 1. Opens the Project Settings window via the Hook API + 2. Waits several render frames + 3. Verifies the window opened (state is reflected back via get_value) + 4. Verifies the app is still responsive (status endpoint returns 200) + 5. Verifies no AttributeError was logged (the bug would print to + the GUI's stderr, which the live_gui fixture captures to a log) + """ + process, gui_script = live_gui + client = ApiHookClient() + + log_path = Path(f"logs/{Path(gui_script).name.replace('.', '_')}_test.log") + log_offset_before = log_path.stat().st_size if log_path.exists() else 0 + + client.set_value('show_windows["Project Settings"]', True) + time.sleep(2.0) + + opened = client.get_value('show_windows["Project Settings"]') + assert opened is True, f"Project Settings window did not open: {opened}" + + status = client.get_status() + assert status is not None, "App status endpoint returned None — app is not responsive" + assert status.get("status") == "ok", f"App status not ok: {status}" + + time.sleep(1.0) + if log_path.exists(): + with log_path.open("r", encoding="utf-8", errors="ignore") as f: + f.seek(log_offset_before) + new_log = f.read() + assert "AttributeError: module 'tkinter' has no attribute 'filedialog'" not in new_log, ( + "GUI logged 'AttributeError: module tkinter has no attribute filedialog' " + "after opening Project Settings. The _LazyModule fallback to _FiledialogStub " + "is not working in the live app." + ) + assert "AttributeError: module 'tkinter' has no attribute 'filedialog'" not in new_log + if "AttributeError" in new_log: + pytest.fail(f"App logged unexpected AttributeError: {new_log[max(0, new_log.find('AttributeError')-200):new_log.find('AttributeError')+200]}")