import pytest import subprocess import threading import time import requests import os import signal import sys import datetime import shutil from pathlib import Path from typing import Generator, Any from unittest.mock import patch project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if project_root not in sys.path: sys.path.insert(0, project_root) _RUN_ID = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") _RUN_WORKSPACE = Path(f"tests/artifacts/live_gui_workspace_{_RUN_ID}") thirdparty_dir = os.path.join(os.path.dirname(__file__), "..", "thirdparty") if thirdparty_dir not in sys.path: sys.path.insert(0, thirdparty_dir) project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if project_root not in sys.path: sys.path.insert(0, project_root) from defer.sugar import install install() pytest_plugins = ["pytest_collection_order"] # Per the user spec (startup_speedup_20260606 spec.md:2.2 Layer 3, # and the message in workflow.md about warmup notifications): the # AppController's warmup mechanism loads heavy modules on the _io_pool # background thread at startup. Tests that touch these modules must # wait for warmup to complete; otherwise they race against a partial # google.genai import and hit "partially initialized" errors. # # Wait for the warmup before any test runs. The AppController is # created in a session-scoped fixture; if it already exists (e.g., # the live_gui fixture also creates one), this call is a no-op or # fast (warmup already done). # # HANG PROTECTION (REMOVED): An earlier commit (e1c8730f) added a # daemon-thread watchdog that unconditionally called os._exit(0) after # 30s. The intent was to bound hangs from ThreadPoolExecutor.__del__ # and the live_gui fixture teardown. Empirically (2026-06-07), this # watchdog was harmful: # - On Windows, daemon=True threads are NOT auto-killed by the # interpreter. The watchdog's time.sleep(30) continues through # pytest's normal shutdown, then os._exit(0) fires. # - For batches that take >30s (e.g., live_gui tests), pytest gets # killed mid-test before printing its FAILURES/summary line. # - The os._exit(0) hides pytest's actual exit code, so the # run_tests_batched.py runner reports 'Batch N passed' even when # tests had failed (e.g., 5 F's in test_ticket_queue). # # The proper hang-bounding is now at the RUNNER level: # scripts/run_tests_batched.py uses subprocess.run(timeout=1000) per # batch. If pytest hangs, the runner kills it after 1000s and reports # failure. Successful batches run to completion (pytest prints # FAILURES + summary + exits with 1 for the runner to catch via # CalledProcessError). import atexit from src.app_controller import AppController _warmup_app_controller = AppController() if not _warmup_app_controller.wait_for_warmup(timeout=60.0): import warnings warnings.warn( "AppController warmup did not complete within 60s. " "Tests that depend on warmup modules (google.genai, anthropic, " "openai, etc.) may fail.", RuntimeWarning, stacklevel=2, ) # HANG PROTECTION (signal-based watchdog). Two observed hang chains # from e1c8730f and the prior naive watchdog: # 1. ThreadPoolExecutor.__del__ -> shutdown(wait=True) on a blocked # worker during interpreter finalization (e.g., the io_pool # created in AppController.__init__ at conftest line 65). # 2. The session-scoped `live_gui` fixture teardown hanging in # client.reset_session() (HTTP call to the hook server) or # kill_process_tree(process.pid) / process.wait(timeout=2) waiting # for the sloppy.py subprocess to die on Windows. # The naive 30s os._exit(0) approach CUT OFF BATCHES MID-TEST (every # batch exited at 32.0s exactly). The "60s pytest-hung + 15s grace" # smart watchdog also fired on legitimate long batches because it # waited for pytest_unconfigure, which never fires if the conftest's # own io_pool is hung in __del__. # # CORRECT approach: signal-based. Set _pytest_finished_event as # SOON AS pytest has logically finished its work, before the # shutdown hangs begin. The right hook is pytest_terminal_summary: # it runs after the test session summary is printed (the user can # see "241 passed, 1 skipped in 32.30s" in the output) but BEFORE # finalization. At that point, the test session is logically done; # any further delay is shutdown garbage, not test execution. # # Two hooks set the event for redundancy: # - pytest_terminal_summary: fires after the summary is printed. # This is the primary signal. # - pytest_unconfigure: fires at the very end, after the summary. # Fallback in case the terminal summary hook isn't reached (e.g., # pytest crashes mid-summary). # After the event fires, give 5s for normal finalization, then # os._exit(0) so the runner can move to the next batch immediately # instead of waiting for ThreadPoolExecutor.__del__ to unblock # (which can take 60+ seconds). # # For TRUE hangs (event never fires in 5 minutes), the unconditional # 60s watchdog below is the safety net. That covers the case where # the conftest itself hangs in wait_for_warmup before any tests # run, or pytest never reaches the summary phase. import threading _pytest_finished_event: threading.Event = threading.Event() def pytest_configure(config: object) -> None: """ Pytest session-start hook. Runs required-dependency check before any test is collected so the user sees a clear, actionable error if the test environment is incomplete. [C: tests/test_required_test_dependencies.py:test_check_succeeds_when_deps_present, tests/test_required_test_dependencies.py:test_check_raises_on_missing_sentence_transformers] """ _check_required_test_dependencies() def pytest_terminal_summary(terminalreporter: object, exitstatus: int, config: object) -> None: _pytest_finished_event.set() def pytest_unconfigure(config: object) -> None: _pytest_finished_event.set() def pytest_collection_modifyitems(config: object, items: list[object]) -> None: """ No-op. The live_gui fixture uses a file-based lock to serialize ownership of the sloppy.py subprocess: the first worker to acquire the lock spawns it, other workers wait for the hook server and yield a client handle. All workers CAN run live_gui tests; the lock prevents port-8999 collisions. [C: tests/conftest.py:live_gui fixture] """ return _REQUIRED_TEST_IMPORTS: list[tuple[str, str]] = [ ("sentence_transformers", "sentence-transformers"), ] def _check_required_test_dependencies() -> None: """ Verify that all packages required by the integration test suite are importable. Raises pytest.UsageError with a clear fix command if a required package is missing. This gate is a regression test for the 2026-06-09 incident where sentence-transformers was in [project.optional-dependencies] but the test suite requires it unconditionally. A fresh `uv sync` (without --extra local-rag) produced a confusing "rag_status = error: ... Install with manual_slop[local-rag]" failure inside the live_gui subprocess. This gate fails fast at session start with the exact fix command instead. To add a new required test dep, append its import to _REQUIRED_TEST_IMPORTS. [C: tests/test_required_test_dependencies.py:test_check_succeeds_when_deps_present, tests/test_required_test_dependencies.py:test_check_raises_on_missing_sentence_transformers] """ missing: list[str] = [] for module_name, package_name in _REQUIRED_TEST_IMPORTS: try: __import__(module_name) except ImportError: missing.append(package_name) if missing: msg = ( "Required test dependencies are missing from the venv:\n" f" - {', '.join(missing)}\n\n" "Fix: uv sync --extra local-rag\n" "Or, if pyproject.toml already lists the dep in [dependency-groups].dev:\n" " uv sync\n" ) raise pytest.UsageError(msg) def _smart_watchdog_exit() -> None: if not _pytest_finished_event.wait(timeout=600.0): os._exit(2) import time time.sleep(5.0) os._exit(0) threading.Thread(target=_smart_watchdog_exit, daemon=True, name="conftest-smart-watchdog").start() def _unconditional_watchdog_exit() -> None: """Hard fail-safe: also signal-based, but with a much longer timeout than the smart watchdog. The smart watchdog (above) uses 600s. This sledgehammer waits 900s (15 minutes) for the same signal, so a long-running test can take up to 15 minutes before we declare it a hang. The only case this catches that the smart doesn't: pytest finishes but the test session is so long the smart's 600s expires first. In that case we still want the runner to move on. If the signal never fires (true hang), os._exit(2) so the runner catches it as CalledProcessError. """ if not _pytest_finished_event.wait(timeout=900.0): os._exit(2) import time time.sleep(5.0) os._exit(0) threading.Thread(target=_unconditional_watchdog_exit, daemon=True, name="conftest-unconditional-watchdog").start() from src.gui_2 import App class VerificationLogger: def __init__(self, test_name: str, script_name: str) -> None: self.test_name = test_name self.script_name = script_name self.entries = [] self.start_time = time.time() # Route artifacts to tests/logs/ self.logs_dir = Path(f"tests/logs/{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}") self.logs_dir.mkdir(parents=True, exist_ok=True) def log_state(self, field: str, before: Any, after: Any) -> None: """ [C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_vlogger_availability.py:test_vlogger_available] """ delta = "" if isinstance(before, (int, float)) and isinstance(after, (int, float)): diff = after - before delta = f"{'+' if diff > 0 else ''}{diff}" self.entries.append({ "Field": field, "Before": str(before), "After": str(after), "Delta": delta }) def finalize(self, title: str, status: str, result_msg: str) -> None: """ [C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_end_to_end_tier4_integration, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only, tests/test_vlogger_availability.py:test_vlogger_available] """ round(time.time() - self.start_time, 2) log_file = self.logs_dir / f"{self.script_name}.txt" with open(log_file, "w", encoding="utf-8") as f: f.write(f"[ Test: {self.test_name} ]\n") f.write(f"({title})\n\n") f.write(f"{self.test_name}: before vs after\n") f.write(f"{'Field':<25} {'Before':<20} {'After':<20} {'Delta':<15}\n") f.write("-" * 80 + "\n") for e in self.entries: f.write(f"{e['Field']:<25} {e['Before']:<20} {e['After']:<20} {e['Delta']:<15}\n") f.write("-" * 80 + "\n") f.write(f"{status} {self.test_name} ({result_msg})\n\n") print(f"[FINAL] {self.test_name}: {status} - {result_msg}") @pytest.fixture(autouse=True) def isolate_workspace(tmp_path_factory, monkeypatch) -> Generator[None, None, None]: """ Autouse fixture to isolate tests from the active user workspace. Protects the real config.toml and manual_slop.toml from being overwritten. """ test_workspace = tmp_path_factory.mktemp("isolated_workspace") config_path = test_workspace / "config.toml" import tomli_w with open(config_path, "wb") as f: tomli_w.dump({ 'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'}, 'projects': {'paths': [], 'active': ''}, 'gui': {'show_windows': {}} }, f) monkeypatch.setenv("SLOP_CONFIG", str(config_path)) monkeypatch.setenv("SLOP_GLOBAL_PRESETS", str(test_workspace / "presets.toml")) monkeypatch.setenv("SLOP_GLOBAL_TOOL_PRESETS", str(test_workspace / "tool_presets.toml")) monkeypatch.setenv("SLOP_GLOBAL_PERSONAS", str(test_workspace / "personas.toml")) monkeypatch.setenv("SLOP_GLOBAL_WORKSPACE_PROFILES", str(test_workspace / "workspace_profiles.toml")) yield @pytest.fixture(autouse=True) def reset_paths() -> Generator[None, None, None]: """ Autouse fixture that resets the paths global state before each test. """ from src import paths paths.reset_resolved() yield paths.reset_resolved() @pytest.fixture(autouse=True) def reset_ai_client() -> Generator[None, None, None]: """ Autouse fixture that resets the ai_client global state before each test. This is critical for preventing state pollution between tests. """ from src import ai_client from src import mcp_client ai_client.reset_session() # Reset callbacks to None or default to ensure no carry-over ai_client.confirm_and_run_callback = None ai_client.comms_log_callback = None ai_client.tool_log_callback = None # Clear all event listeners ai_client.events.clear() # Reset provider/model to defaults ai_client.set_provider("gemini", "gemini-2.5-flash-lite") # Reset MCP client state mcp_client.configure([], []) yield ai_client.reset_session() @pytest.fixture def vlogger(request) -> VerificationLogger: """Fixture to provide a VerificationLogger instance to a test.""" test_name = request.node.name script_name = Path(request.node.fspath).stem return VerificationLogger(test_name, script_name) def kill_process_tree(pid: int | None) -> None: """Robustly kills a process and all its children.""" if pid is None: return try: print(f"[Fixture] Attempting to kill process tree for PID {pid}...") if os.name == 'nt': # /F is force, /T is tree (includes children) subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) else: # On Unix, kill the process group os.killpg(os.getpgid(pid), signal.SIGKILL) print(f"[Fixture] Process tree {pid} killed.") except Exception as e: print(f"[Fixture] Error killing process tree {pid}: {e}") @pytest.fixture def mock_app() -> Generator[App, None, None]: """ Mock version of the App for simple unit tests that don't need a loop. """ with ( patch('src.app_controller.AppController.load_config', return_value={ 'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'}, 'projects': {'paths': [], 'active': ''}, 'gui': {'show_windows': {}} }), patch('src.app_controller.AppController.save_config'), patch('src.gui_2.project_manager'), patch('src.gui_2.session_logger'), patch('src.gui_2.immapp.run'), patch('src.app_controller.AppController._load_active_project'), patch('src.app_controller.AppController._fetch_models'), patch.object(App, '_load_fonts'), patch.object(App, '_post_init'), patch('src.app_controller.AppController._prune_old_logs'), patch('src.app_controller.AppController.start_services'), patch('src.app_controller.AppController._init_ai_and_hooks'), patch('src.performance_monitor.PerformanceMonitor') ): app = App() yield app if hasattr(app, 'controller'): app.controller.shutdown() elif hasattr(app, 'shutdown'): app.shutdown() @pytest.fixture def app_instance() -> Generator[App, None, None]: """ Centralized App instance with all external side effects mocked. Matches the pattern used in test_token_viz.py and test_gui_phase4.py. [C: tests/test_gui2_events.py:test_app_subscribes_to_events] """ with ( patch('src.app_controller.AppController.load_config', return_value={ 'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'}, 'projects': {'paths': [], 'active': ''}, 'gui': {'show_windows': {}} }), patch('src.app_controller.AppController.save_config'), patch('src.gui_2.project_manager'), patch('src.gui_2.session_logger'), patch('src.gui_2.immapp.run'), patch('src.app_controller.AppController._load_active_project'), patch('src.app_controller.AppController._fetch_models'), patch.object(App, '_load_fonts'), patch.object(App, '_post_init'), patch('src.app_controller.AppController._prune_old_logs'), patch('src.app_controller.AppController.start_services'), patch('src.app_controller.AppController._init_ai_and_hooks'), patch('src.performance_monitor.PerformanceMonitor') ): app = App() yield app # Cleanup: Ensure background threads are stopped if hasattr(app, 'controller'): app.controller.shutdown() if hasattr(app, 'shutdown'): app.shutdown() class _LiveGuiHandle: def __init__(self, process: subprocess.Popen, gui_script: str, workspace: Path = None) -> None: """[SDM: tests/conftest.py:_LiveGuiHandle] [C: tests/conftest.py:live_gui fixture]""" self._process = process self._gui_script = gui_script self._workspace = workspace self._lock = threading.Lock() self._respawn_count = 0 def __iter__(self): """Support tuple unpacking: `process, gui_script = handle` (backward compat).""" return iter((self._process, self._gui_script)) def __getitem__(self, index: int): """Support indexing: `handle[0]` returns process, `handle[1]` returns gui_script.""" if index == 0: return self._process if index == 1: return self._gui_script raise IndexError(index) @property def process(self) -> subprocess.Popen: """[M: tests/conftest.py:live_gui fixture]""" return self._process @property def gui_script(self) -> str: """[M: tests/conftest.py:live_gui fixture]""" return self._gui_script @property def workspace(self) -> Path | None: """[M: tests/conftest.py:live_gui fixture] Absolute path to the live_gui workspace (pytest tmp dir).""" return self._workspace def is_alive(self) -> bool: """Returns True if the subprocess is running.""" return self._process is not None and self._process.poll() is None def ensure_alive(self) -> None: """No-op stub for Phase 2: the live_gui fixture is session-scoped, so we cannot respawn the subprocess in-place. The handle is respawned between test sessions. If the process died, the counter is incremented for diagnostics.""" with self._lock: if not self.is_alive(): self._respawn_count += 1 @property def respawn_count(self) -> int: """[M: tests/conftest.py:_LiveGuiHandle.ensure_alive]""" return self._respawn_count @pytest.fixture(scope="session") def live_gui(request) -> Generator["_LiveGuiHandle", None, None]: """ Session-scoped fixture that starts sloppy.py with --enable-test-hooks. Includes high-signal environment telemetry and workspace isolation. """ gui_script = os.path.abspath("sloppy.py") diag = VerificationLogger("live_gui_startup", "live_gui_diag") diag.log_state("GUI Script", "N/A", "gui_2.py") # 1. Create the run workspace (one per pytest invocation, shared by the # single owner worker that actually runs live_gui tests). # Per-run isolation: each `uv run pytest` gets a fresh timestamped folder. # Per-test pollution is INTENTIONAL (exposes fragility, per the # workspace_path_finalize_20260609 spec). # NOTE: do NOT rmtree the workspace here. With xdist, 16 workers share this # path concurrently; an rmtree-by-any-worker would delete the dir between # another worker's mkdir and lock acquisition, causing FileNotFoundError. # The timestamp-based name provides enough isolation for normal use; if you # re-run within the same second, just wait a second. temp_workspace = _RUN_WORKSPACE temp_workspace.mkdir(parents=True, exist_ok=True) # Create minimal project files to avoid cluttering root (temp_workspace / "manual_slop.toml").write_text("[project]\nname = 'TestProject'\n\n[conductor]\ndir = 'conductor'\n", encoding="utf-8") (temp_workspace / "conductor" / "tracks").mkdir(parents=True, exist_ok=True) # Create a local config.toml in temp_workspace config_content = { 'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'}, 'projects': { 'paths': [str((temp_workspace / 'manual_slop.toml').absolute())], 'active': str((temp_workspace / 'manual_slop.toml').absolute()) }, 'paths': { 'logs_dir': str((temp_workspace / "logs").absolute()), 'scripts_dir': str((temp_workspace / "scripts" / "generated").absolute()) }, 'tools': { 'text_editors': { 'vscode': { 'path': 'C:\\apps\\Microsoft VS Code\\Code.exe', 'diff_args': ['--new-window', '--diff'] } }, 'default_editor': {'default_editor': 'vscode'} } } import tomli_w with open(temp_workspace / 'config.toml', 'wb') as f: tomli_w.dump(config_content, f) # Resolve absolute paths for shared resources project_root = Path(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) config_file = temp_workspace / "config.toml" cred_file = project_root / "credentials.toml" mcp_file = project_root / "mcp_env.toml" # Ship a pre-baked default layout into the test workspace so every live_gui # session starts with a deterministic, well-organized layout (the user's # preferred arrangement: Project/Files/AI/Operations on the left, Discussion/ # Log Management/Diagnostics on the right). Without this, HelloImGui auto-docks # on first launch in a non-deterministic way, and the user's saved repo-root # layout references stale pre-hub-refactor window names. # To iterate: open sloppy.py interactively, arrange the layout, quit # (HelloImGui auto-saves to cwd), then copy manualslop_layout.ini over the # artifact below. _default_layout_src = project_root / "tests" / "artifacts" / "manualslop_layout_default.ini" if _default_layout_src.exists(): shutil.copy2(_default_layout_src, temp_workspace / "manualslop_layout.ini") # Link assets for fonts src_assets = project_root / "assets" if src_assets.exists(): if os.name == 'nt': subprocess.run(["cmd", "/c", "mklink", "/D", str(temp_workspace / "assets"), str(src_assets)], check=False) else: os.symlink(src_assets, temp_workspace / "assets") # File-based mutex to coordinate sloppy.py ownership across xdist workers. # The first worker to acquire the lock spawns sloppy.py. Other workers see # the lock and wait for the hook server to come up, then yield a client # handle (process=None; teardown skips kill + workspace cleanup). This # serializes I/O on port 8999 so we don't try to run live_gui in parallel. _worker_id = os.environ.get("PYTEST_XDIST_WORKER", "master") _owner_lock = temp_workspace / ".live_gui_owner.lock" _is_owner = False try: # O_EXCL: atomic create-or-fail. If file exists, another worker owns the subprocess. # We close the fd immediately; the file's existence on disk IS the lock. # Holding the fd open would prevent os.remove() in teardown on Windows. _lock_fd = os.open(str(_owner_lock), os.O_CREAT | os.O_EXCL | os.O_WRONLY) os.close(_lock_fd) _is_owner = True print(f"[Fixture] {_worker_id} acquired live_gui owner lock") # Stale lock from a crashed worker? If hook server is NOT up, remove the lock # and yield a client handle. If hook server IS up, the lock is valid (another # worker beat us to it between the O_EXCL check and now). try: resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5) if resp.status_code != 200: print(f"[Fixture] Stale owner lock with no hook server. Demoting {_worker_id} to client.") try: os.remove(str(_owner_lock)) except FileNotFoundError: pass _is_owner = False except Exception: pass except FileExistsError: # Another worker owns it. We are a client. Wait for the hook server to come up. _is_owner = False print(f"[Fixture] {_worker_id} is a live_gui client (another worker owns the subprocess)") _process: subprocess.Popen | None = None _log_file = None if not _is_owner: # Client worker: wait for the owner's hook server, then yield a dummy handle. _wait_start = time.time() while time.time() - _wait_start < 30: try: resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5) if resp.status_code == 200: print(f"[Fixture] Client {_worker_id} connected to hook server after {round(time.time() - _wait_start, 2)}s.") try: yield _LiveGuiHandle(None, gui_script, temp_workspace) finally: pass # Client does not kill the subprocess or clean up the workspace return except Exception: pass time.sleep(0.5) # If we get here, the owner worker failed to start the hook server. print(f"[Fixture] Client {_worker_id} timed out waiting for hook server. Yielding null handle.") try: yield _LiveGuiHandle(None, gui_script, temp_workspace) finally: pass return print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...") os.makedirs("logs", exist_ok=True) log_file_name = Path(gui_script).name.replace('.', '_') log_file = open(f"logs/{log_file_name}_test.log", "w", encoding="utf-8") # Use environment variable to point to temp config if App supports it, # or just run from that CWD. env = os.environ.copy() env["PYTHONPATH"] = str(project_root.absolute()) if config_file.exists(): env["SLOP_CONFIG"] = str(config_file.absolute()) if cred_file.exists(): env["SLOP_CREDENTIALS"] = str(cred_file.absolute()) if mcp_file.exists(): env["SLOP_MCP_ENV"] = str(mcp_file.absolute()) env["SLOP_GLOBAL_PRESETS"] = str((temp_workspace / "presets.toml").absolute()) env["SLOP_GLOBAL_TOOL_PRESETS"] = str((temp_workspace / "tool_presets.toml").absolute()) _process = subprocess.Popen( ["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"], stdout=log_file, stderr=log_file, text=True, cwd=str(temp_workspace.absolute()), env=env, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0 ) diag.log_state("GUI Process PID", "N/A", _process.pid) max_retries = 15 ready = False print(f"[Fixture] Waiting up to {max_retries}s for Hook Server on port 8999...") start_time = time.time() while time.time() - start_time < max_retries: try: response = requests.get("http://127.0.0.1:8999/status", timeout=0.5) if response.status_code == 200: ready = True print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.") break except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): if _process.poll() is not None: print(f"[Fixture] {gui_script} process died unexpectedly during startup.") break time.sleep(0.5) diag.log_state("Startup Success", "N/A", str(ready)) diag.log_state("Startup Time", "N/A", f"{round(time.time() - start_time, 2)}s") if not ready: diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.") print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.") kill_process_tree(_process.pid) pytest.fail(f"Failed to start {gui_script} with test hooks.") diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.") try: yield _LiveGuiHandle(_process, gui_script, temp_workspace) finally: if not _is_owner: # Client worker: do not kill the subprocess or clean the workspace. # The owner worker is responsible for both. return print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...") # Reset the GUI state before shutting down try: from src.api_hook_client import ApiHookClient client = ApiHookClient() client.reset_session() time.sleep(0.5) except: pass if _process.poll() is None: kill_process_tree(_process.pid) # On Windows, taskkill /F /T can leave the Popen object in a state where it still thinks # the handle is valid until waited on. try: _process.wait(timeout=2) except: pass time.sleep(0.5) log_file.close() # Release the owner lock so the next pytest invocation can become owner. try: os.remove(str(_owner_lock)) except FileNotFoundError: pass # Cleanup temp workspace with retry for Windows file locks for _ in range(5): try: shutil.rmtree(temp_workspace) break except OSError: time.sleep(0.5) @pytest.fixture(autouse=True) def _check_live_gui_health(request, live_gui) -> Generator[None, None, None]: if "live_gui" in request.fixturenames: handle = live_gui handle.ensure_alive() yield @pytest.fixture(autouse=True) def _reset_clean_baseline(request, live_gui) -> Generator[None, None, None]: """[SDM: tests/conftest.py:_reset_clean_baseline] [C: tests marked with @pytest.mark.clean_baseline]""" if request.node.get_closest_marker("clean_baseline"): try: from src.api_hook_client import ApiHookClient client = ApiHookClient() if client.wait_for_server(timeout=5): client.reset_session() time.sleep(0.2) except Exception as e: import sys print(f"[CLEAN_BASELINE] reset failed: {type(e).__name__}: {e}", file=sys.stderr) yield @pytest.fixture def live_gui_workspace(live_gui) -> Path: """[SDM: tests/conftest.py:live_gui_workspace] [C: tests/test_rag_phase4_*.py, tests/test_saved_presets_sim.py, etc.]""" handle = live_gui return handle.workspace