import pytest import asyncio import subprocess import time import requests import os import signal import sys import datetime from pathlib import Path from typing import Generator, Any from unittest.mock import patch, MagicMock # Ensure project root is in path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) import ai_client from gui_2 import App @pytest.fixture(autouse=True) def reset_ai_client() -> Generator[None, None, None]: """Reset ai_client global state between every test to prevent state pollution.""" ai_client.reset_session() # Default to a safe model ai_client.set_provider("gemini", "gemini-2.5-flash-lite") yield @pytest.fixture def app_instance() -> Generator[App, None, None]: """ Centralized App instance with all external side effects mocked. Matches the pattern used in test_token_viz.py and test_gui_phase4.py. """ with ( patch('gui_2.load_config', return_value={ 'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'}, 'projects': {'paths': [], 'active': ''}, 'gui': {'show_windows': {}} }), patch('gui_2.save_config'), patch('gui_2.project_manager'), patch('gui_2.session_logger'), patch('gui_2.immapp.run'), patch.object(App, '_load_active_project'), patch.object(App, '_fetch_models'), patch.object(App, '_load_fonts'), patch.object(App, '_post_init'), patch.object(App, '_prune_old_logs'), patch.object(App, '_init_ai_and_hooks') ): app = App() yield app # Cleanup: Ensure asyncio loop is stopped and tasks are cancelled if hasattr(app, '_loop'): # 1. Identify all pending tasks in app._loop. tasks = [t for t in asyncio.all_tasks(app._loop) if not t.done()] # 2. Cancel them using task.cancel(). for task in tasks: task.cancel() # Stop background thread to take control of the loop thread-safely if app._loop.is_running(): app._loop.call_soon_threadsafe(app._loop.stop) if hasattr(app, '_loop_thread') and app._loop_thread.is_alive(): app._loop_thread.join(timeout=2.0) # 3. Wait for them to complete using loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)). if tasks: app._loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) # 4. Then stop the loop. app._loop.stop() @pytest.fixture def mock_app(app_instance: App) -> App: """ Simpler fixture returning a mocked App instance. Reuses app_instance for automatic cleanup and consistent mocking. """ return app_instance class VerificationLogger: """High-signal reporting for automated tests, inspired by Unreal Engine's diagnostic style.""" def __init__(self, test_name: str, script_name: str): self.test_name = test_name self.script_name = script_name self.logs_dir = Path(f"logs/test/{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}") self.logs_dir.mkdir(parents=True, exist_ok=True) self.log_file = self.logs_dir / f"{script_name}.txt" self.entries = [] def log_state(self, field: str, before: Any, after: Any, delta: Any = None): self.entries.append({ "Field": field, "Before": str(before), "After": str(after), "Delta": str(delta) if delta is not None else "" }) # Also print to stdout for real-time visibility in CI print(f"[STATE] {field}: {before} -> {after}") def finalize(self, description: str, status: str, result_msg: str): with open(self.log_file, "a", encoding="utf-8") as f: f.write(f"[ Test: {self.test_name} ]\n") f.write(f"({description})\n\n") f.write(f"{self.test_name}: before vs after\n") f.write(f"{'Field':<25} {'Before':<20} {'After':<20} {'Delta':<15}\n") f.write("-" * 80 + "\n") for e in self.entries: f.write(f"{e['Field']:<25} {e['Before']:<20} {e['After']:<20} {e['Delta']:<15}\n") f.write("-" * 80 + "\n") f.write(f"{status} {self.test_name} ({result_msg})\n\n") print(f"[FINAL] {self.test_name}: {status} - {result_msg}") @pytest.fixture def vlogger(request) -> VerificationLogger: """Fixture to provide a VerificationLogger instance to a test.""" test_name = request.node.name script_name = Path(request.node.fspath).stem return VerificationLogger(test_name, script_name) def kill_process_tree(pid: int | None) -> None: """Robustly kills a process and all its children.""" if pid is None: return try: print(f"[Fixture] Attempting to kill process tree for PID {pid}...") if os.name == 'nt': # /F is force, /T is tree (includes children) subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) else: # On Unix, kill the process group os.killpg(os.getpgid(pid), signal.SIGKILL) print(f"[Fixture] Process tree {pid} killed.") except Exception as e: print(f"[Fixture] Error killing process tree {pid}: {e}") @pytest.fixture(scope="session") def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]: """ Session-scoped fixture that starts gui_2.py with --enable-test-hooks. Includes high-signal environment telemetry. """ gui_script = "gui_2.py" diag = VerificationLogger("live_gui_startup", "live_gui_diag") diag.log_state("GUI Script", "N/A", gui_script) # Check if already running (shouldn't be) try: resp = requests.get("http://127.0.0.1:8999/status", timeout=0.1) already_up = resp.status_code == 200 except: already_up = False diag.log_state("Hook Server Port 8999", "Down", "UP" if already_up else "Down") print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks...") os.makedirs("logs", exist_ok=True) log_file = open(f"logs/{gui_script.replace('.', '_')}_test.log", "w", encoding="utf-8") process = subprocess.Popen( ["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"], stdout=log_file, stderr=log_file, text=True, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0 ) diag.log_state("GUI Process PID", "N/A", process.pid) max_retries = 15 ready = False print(f"[Fixture] Waiting up to {max_retries}s for Hook Server on port 8999...") start_time = time.time() while time.time() - start_time < max_retries: try: response = requests.get("http://127.0.0.1:8999/status", timeout=0.5) if response.status_code == 200: ready = True print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.") break except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): if process.poll() is not None: print(f"[Fixture] {gui_script} process died unexpectedly during startup.") break time.sleep(0.5) diag.log_state("Startup Success", "N/A", str(ready)) diag.log_state("Startup Time", "N/A", f"{round(time.time() - start_time, 2)}s") if not ready: diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.") print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.") kill_process_tree(process.pid) pytest.fail(f"Failed to start {gui_script} with test hooks.") diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.") try: yield process, gui_script finally: print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...") # Reset the GUI state before shutting down try: from api_hook_client import ApiHookClient client = ApiHookClient() client.reset_session() time.sleep(0.5) except: pass kill_process_tree(process.pid) log_file.close()