9796fe27f4
The unconditional watchdog (91b19c90) was a 90s time.sleep, which fired for ANY batch that ran >90s from conftest load — even legitimate slow live_gui tests. User confirmed: Batch 2 ended at 92.1s because the unconditional fired mid-test (the smart watchdog's signal hadn't fired yet because pytest_terminal_summary only runs after all tests are done).
Fix: make the unconditional ALSO signal-based. Both watchdogs now wait for the same _pytest_finished_event. The difference is just the timeout:
- Smart: 300s pytest-hung + 5s grace (handles normal cases)
- Unconditional: 900s pytest-hung + 5s grace (catches extremely long test runs)
- If the signal never fires, both fire os._exit(2) (the first to time out wins).
Why 900s for unconditional: pytest_terminal_summary fires AFTER the summary print. For a normal batch, that's ~32s. For an extremely long batch (e.g., 10+ minutes of slow tests), we want to wait the full duration before declaring it hung. 900s = 15 min is a safe upper bound; the run_tests_batched.py subprocess.run(timeout=1000) is the final safety net for catastrophic hangs.
Two-thread design is intentional (redundant safety). If one thread is somehow blocked, the other fires. The grace period is 5s for both, so the first to fire wins the race.
537 lines
21 KiB
Python
537 lines
21 KiB
Python
import pytest
|
|
import subprocess
|
|
import time
|
|
import requests
|
|
import os
|
|
import signal
|
|
import sys
|
|
import datetime
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Generator, Any
|
|
from unittest.mock import patch
|
|
|
|
thirdparty_dir = os.path.join(os.path.dirname(__file__), "..", "thirdparty")
|
|
if thirdparty_dir not in sys.path:
|
|
sys.path.insert(0, thirdparty_dir)
|
|
|
|
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if project_root not in sys.path:
|
|
sys.path.insert(0, project_root)
|
|
|
|
from defer.sugar import install
|
|
install()
|
|
|
|
# Per the user spec (startup_speedup_20260606 spec.md:2.2 Layer 3,
|
|
# and the message in workflow.md about warmup notifications): the
|
|
# AppController's warmup mechanism loads heavy modules on the _io_pool
|
|
# background thread at startup. Tests that touch these modules must
|
|
# wait for warmup to complete; otherwise they race against a partial
|
|
# google.genai import and hit "partially initialized" errors.
|
|
#
|
|
# Wait for the warmup before any test runs. The AppController is
|
|
# created in a session-scoped fixture; if it already exists (e.g.,
|
|
# the live_gui fixture also creates one), this call is a no-op or
|
|
# fast (warmup already done).
|
|
#
|
|
# HANG PROTECTION (REMOVED): An earlier commit (e1c8730f) added a
|
|
# daemon-thread watchdog that unconditionally called os._exit(0) after
|
|
# 30s. The intent was to bound hangs from ThreadPoolExecutor.__del__
|
|
# and the live_gui fixture teardown. Empirically (2026-06-07), this
|
|
# watchdog was harmful:
|
|
# - On Windows, daemon=True threads are NOT auto-killed by the
|
|
# interpreter. The watchdog's time.sleep(30) continues through
|
|
# pytest's normal shutdown, then os._exit(0) fires.
|
|
# - For batches that take >30s (e.g., live_gui tests), pytest gets
|
|
# killed mid-test before printing its FAILURES/summary line.
|
|
# - The os._exit(0) hides pytest's actual exit code, so the
|
|
# run_tests_batched.py runner reports 'Batch N passed' even when
|
|
# tests had failed (e.g., 5 F's in test_ticket_queue).
|
|
#
|
|
# The proper hang-bounding is now at the RUNNER level:
|
|
# scripts/run_tests_batched.py uses subprocess.run(timeout=1000) per
|
|
# batch. If pytest hangs, the runner kills it after 1000s and reports
|
|
# failure. Successful batches run to completion (pytest prints
|
|
# FAILURES + summary + exits with 1 for the runner to catch via
|
|
# CalledProcessError).
|
|
import atexit
|
|
from src.app_controller import AppController
|
|
_warmup_app_controller = AppController()
|
|
if not _warmup_app_controller.wait_for_warmup(timeout=60.0):
|
|
import warnings
|
|
warnings.warn(
|
|
"AppController warmup did not complete within 60s. "
|
|
"Tests that depend on warmup modules (google.genai, anthropic, "
|
|
"openai, etc.) may fail.",
|
|
RuntimeWarning,
|
|
stacklevel=2,
|
|
)
|
|
|
|
# HANG PROTECTION (signal-based watchdog). Two observed hang chains
|
|
# from e1c8730f and the prior naive watchdog:
|
|
# 1. ThreadPoolExecutor.__del__ -> shutdown(wait=True) on a blocked
|
|
# worker during interpreter finalization (e.g., the io_pool
|
|
# created in AppController.__init__ at conftest line 65).
|
|
# 2. The session-scoped `live_gui` fixture teardown hanging in
|
|
# client.reset_session() (HTTP call to the hook server) or
|
|
# kill_process_tree(process.pid) / process.wait(timeout=2) waiting
|
|
# for the sloppy.py subprocess to die on Windows.
|
|
# The naive 30s os._exit(0) approach CUT OFF BATCHES MID-TEST (every
|
|
# batch exited at 32.0s exactly). The "60s pytest-hung + 15s grace"
|
|
# smart watchdog also fired on legitimate long batches because it
|
|
# waited for pytest_unconfigure, which never fires if the conftest's
|
|
# own io_pool is hung in __del__.
|
|
#
|
|
# CORRECT approach: signal-based. Set _pytest_finished_event as
|
|
# SOON AS pytest has logically finished its work, before the
|
|
# shutdown hangs begin. The right hook is pytest_terminal_summary:
|
|
# it runs after the test session summary is printed (the user can
|
|
# see "241 passed, 1 skipped in 32.30s" in the output) but BEFORE
|
|
# finalization. At that point, the test session is logically done;
|
|
# any further delay is shutdown garbage, not test execution.
|
|
#
|
|
# Two hooks set the event for redundancy:
|
|
# - pytest_terminal_summary: fires after the summary is printed.
|
|
# This is the primary signal.
|
|
# - pytest_unconfigure: fires at the very end, after the summary.
|
|
# Fallback in case the terminal summary hook isn't reached (e.g.,
|
|
# pytest crashes mid-summary).
|
|
# After the event fires, give 5s for normal finalization, then
|
|
# os._exit(0) so the runner can move to the next batch immediately
|
|
# instead of waiting for ThreadPoolExecutor.__del__ to unblock
|
|
# (which can take 60+ seconds).
|
|
#
|
|
# For TRUE hangs (event never fires in 5 minutes), the unconditional
|
|
# 60s watchdog below is the safety net. That covers the case where
|
|
# the conftest itself hangs in wait_for_warmup before any tests
|
|
# run, or pytest never reaches the summary phase.
|
|
import threading
|
|
_pytest_finished_event: threading.Event = threading.Event()
|
|
|
|
def pytest_terminal_summary(terminalreporter: object, exitstatus: int, config: object) -> None:
|
|
_pytest_finished_event.set()
|
|
|
|
def pytest_unconfigure(config: object) -> None:
|
|
_pytest_finished_event.set()
|
|
|
|
def _smart_watchdog_exit() -> None:
|
|
if not _pytest_finished_event.wait(timeout=300.0):
|
|
os._exit(2)
|
|
import time
|
|
time.sleep(5.0)
|
|
os._exit(0)
|
|
|
|
threading.Thread(target=_smart_watchdog_exit, daemon=True, name="conftest-smart-watchdog").start()
|
|
|
|
def _unconditional_watchdog_exit() -> None:
|
|
"""Hard fail-safe: also signal-based, but with a much longer
|
|
timeout than the smart watchdog.
|
|
|
|
The smart watchdog (above) uses 300s. This sledgehammer waits
|
|
900s (15 minutes) for the same signal, so a long-running test
|
|
can take up to 15 minutes before we declare it a hang. The
|
|
only case this catches that the smart doesn't: pytest finishes
|
|
but the test session is so long the smart's 300s expires first.
|
|
In that case we still want the runner to move on.
|
|
|
|
If the signal never fires (true hang), os._exit(2) so the runner
|
|
catches it as CalledProcessError.
|
|
"""
|
|
if not _pytest_finished_event.wait(timeout=900.0):
|
|
os._exit(2)
|
|
import time
|
|
time.sleep(5.0)
|
|
os._exit(0)
|
|
|
|
threading.Thread(target=_unconditional_watchdog_exit, daemon=True, name="conftest-unconditional-watchdog").start()
|
|
|
|
from src.gui_2 import App
|
|
|
|
class VerificationLogger:
|
|
def __init__(self, test_name: str, script_name: str) -> None:
|
|
self.test_name = test_name
|
|
self.script_name = script_name
|
|
self.entries = []
|
|
self.start_time = time.time()
|
|
# Route artifacts to tests/logs/
|
|
self.logs_dir = Path(f"tests/logs/{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}")
|
|
self.logs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def log_state(self, field: str, before: Any, after: Any) -> None:
|
|
"""
|
|
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_vlogger_availability.py:test_vlogger_available]
|
|
"""
|
|
delta = ""
|
|
if isinstance(before, (int, float)) and isinstance(after, (int, float)):
|
|
diff = after - before
|
|
delta = f"{'+' if diff > 0 else ''}{diff}"
|
|
self.entries.append({
|
|
"Field": field,
|
|
"Before": str(before),
|
|
"After": str(after),
|
|
"Delta": delta
|
|
})
|
|
|
|
def finalize(self, title: str, status: str, result_msg: str) -> None:
|
|
"""
|
|
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_end_to_end_tier4_integration, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only, tests/test_vlogger_availability.py:test_vlogger_available]
|
|
"""
|
|
round(time.time() - self.start_time, 2)
|
|
log_file = self.logs_dir / f"{self.script_name}.txt"
|
|
with open(log_file, "w", encoding="utf-8") as f:
|
|
f.write(f"[ Test: {self.test_name} ]\n")
|
|
f.write(f"({title})\n\n")
|
|
f.write(f"{self.test_name}: before vs after\n")
|
|
f.write(f"{'Field':<25} {'Before':<20} {'After':<20} {'Delta':<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
for e in self.entries:
|
|
f.write(f"{e['Field']:<25} {e['Before']:<20} {e['After']:<20} {e['Delta']:<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
f.write(f"{status} {self.test_name} ({result_msg})\n\n")
|
|
print(f"[FINAL] {self.test_name}: {status} - {result_msg}")
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def isolate_workspace(tmp_path_factory, monkeypatch) -> Generator[None, None, None]:
|
|
"""
|
|
Autouse fixture to isolate tests from the active user workspace.
|
|
Protects the real config.toml and manual_slop.toml from being overwritten.
|
|
"""
|
|
test_workspace = tmp_path_factory.mktemp("isolated_workspace")
|
|
|
|
config_path = test_workspace / "config.toml"
|
|
import tomli_w
|
|
with open(config_path, "wb") as f:
|
|
tomli_w.dump({
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}, f)
|
|
|
|
monkeypatch.setenv("SLOP_CONFIG", str(config_path))
|
|
monkeypatch.setenv("SLOP_GLOBAL_PRESETS", str(test_workspace / "presets.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_TOOL_PRESETS", str(test_workspace / "tool_presets.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_PERSONAS", str(test_workspace / "personas.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_WORKSPACE_PROFILES", str(test_workspace / "workspace_profiles.toml"))
|
|
|
|
yield
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_paths() -> Generator[None, None, None]:
|
|
"""
|
|
|
|
|
|
Autouse fixture that resets the paths global state before each test.
|
|
"""
|
|
from src import paths
|
|
paths.reset_resolved()
|
|
yield
|
|
paths.reset_resolved()
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_ai_client() -> Generator[None, None, None]:
|
|
"""
|
|
|
|
|
|
Autouse fixture that resets the ai_client global state before each test.
|
|
This is critical for preventing state pollution between tests.
|
|
"""
|
|
from src import ai_client
|
|
from src import mcp_client
|
|
ai_client.reset_session()
|
|
# Reset callbacks to None or default to ensure no carry-over
|
|
ai_client.confirm_and_run_callback = None
|
|
ai_client.comms_log_callback = None
|
|
ai_client.tool_log_callback = None
|
|
# Clear all event listeners
|
|
ai_client.events.clear()
|
|
# Reset provider/model to defaults
|
|
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
|
# Reset MCP client state
|
|
mcp_client.configure([], [])
|
|
yield
|
|
ai_client.reset_session()
|
|
|
|
@pytest.fixture
|
|
def vlogger(request) -> VerificationLogger:
|
|
"""Fixture to provide a VerificationLogger instance to a test."""
|
|
test_name = request.node.name
|
|
script_name = Path(request.node.fspath).stem
|
|
return VerificationLogger(test_name, script_name)
|
|
|
|
def kill_process_tree(pid: int | None) -> None:
|
|
"""Robustly kills a process and all its children."""
|
|
if pid is None:
|
|
return
|
|
try:
|
|
print(f"[Fixture] Attempting to kill process tree for PID {pid}...")
|
|
if os.name == 'nt':
|
|
# /F is force, /T is tree (includes children)
|
|
subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
check=False)
|
|
else:
|
|
# On Unix, kill the process group
|
|
os.killpg(os.getpgid(pid), signal.SIGKILL)
|
|
print(f"[Fixture] Process tree {pid} killed.")
|
|
except Exception as e:
|
|
print(f"[Fixture] Error killing process tree {pid}: {e}")
|
|
|
|
@pytest.fixture
|
|
def mock_app() -> Generator[App, None, None]:
|
|
"""
|
|
|
|
|
|
Mock version of the App for simple unit tests that don't need a loop.
|
|
"""
|
|
with (
|
|
patch('src.models.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('src.models.save_config'),
|
|
patch('src.gui_2.project_manager'),
|
|
patch('src.gui_2.session_logger'),
|
|
patch('src.gui_2.immapp.run'),
|
|
patch('src.app_controller.AppController._load_active_project'),
|
|
patch('src.app_controller.AppController._fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch('src.app_controller.AppController._prune_old_logs'),
|
|
patch('src.app_controller.AppController.start_services'),
|
|
patch('src.app_controller.AppController._init_ai_and_hooks'),
|
|
patch('src.performance_monitor.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
if hasattr(app, 'controller'):
|
|
app.controller.shutdown()
|
|
elif hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
@pytest.fixture
|
|
def app_instance() -> Generator[App, None, None]:
|
|
"""
|
|
|
|
|
|
Centralized App instance with all external side effects mocked.
|
|
Matches the pattern used in test_token_viz.py and test_gui_phase4.py.
|
|
[C: tests/test_gui2_events.py:test_app_subscribes_to_events]
|
|
"""
|
|
with (
|
|
patch('src.models.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('src.models.save_config'),
|
|
patch('src.gui_2.project_manager'),
|
|
patch('src.gui_2.session_logger'),
|
|
patch('src.gui_2.immapp.run'),
|
|
patch('src.app_controller.AppController._load_active_project'),
|
|
patch('src.app_controller.AppController._fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch('src.app_controller.AppController._prune_old_logs'),
|
|
patch('src.app_controller.AppController.start_services'),
|
|
patch('src.app_controller.AppController._init_ai_and_hooks'),
|
|
patch('src.performance_monitor.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
# Cleanup: Ensure background threads are stopped
|
|
if hasattr(app, 'controller'):
|
|
app.controller.shutdown()
|
|
|
|
if hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
@pytest.fixture(scope="session")
|
|
def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
|
|
"""
|
|
|
|
|
|
Session-scoped fixture that starts sloppy.py with --enable-test-hooks.
|
|
Includes high-signal environment telemetry and workspace isolation.
|
|
"""
|
|
gui_script = os.path.abspath("sloppy.py")
|
|
diag = VerificationLogger("live_gui_startup", "live_gui_diag")
|
|
diag.log_state("GUI Script", "N/A", "gui_2.py")
|
|
|
|
# 1. Create a isolated workspace for the live GUI
|
|
temp_workspace = Path("tests/artifacts/live_gui_workspace")
|
|
if temp_workspace.exists():
|
|
for _ in range(5):
|
|
try:
|
|
shutil.rmtree(temp_workspace)
|
|
break
|
|
except PermissionError:
|
|
time.sleep(0.5)
|
|
# Create the workspace directory before writing files
|
|
temp_workspace.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create minimal project files to avoid cluttering root
|
|
(temp_workspace / "manual_slop.toml").write_text("[project]\nname = 'TestProject'\n\n[conductor]\ndir = 'conductor'\n", encoding="utf-8")
|
|
(temp_workspace / "conductor" / "tracks").mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create a local config.toml in temp_workspace
|
|
config_content = {
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {
|
|
'paths': [str((temp_workspace / 'manual_slop.toml').absolute())],
|
|
'active': str((temp_workspace / 'manual_slop.toml').absolute())
|
|
},
|
|
'paths': {
|
|
'logs_dir': str((temp_workspace / "logs").absolute()),
|
|
'scripts_dir': str((temp_workspace / "scripts" / "generated").absolute())
|
|
},
|
|
'tools': {
|
|
'text_editors': {
|
|
'vscode': {
|
|
'path': 'C:\\apps\\Microsoft VS Code\\Code.exe',
|
|
'diff_args': ['--new-window', '--diff']
|
|
}
|
|
},
|
|
'default_editor': {'default_editor': 'vscode'}
|
|
}
|
|
}
|
|
import tomli_w
|
|
with open(temp_workspace / 'config.toml', 'wb') as f:
|
|
tomli_w.dump(config_content, f)
|
|
|
|
# Resolve absolute paths for shared resources
|
|
project_root = Path(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
config_file = temp_workspace / "config.toml"
|
|
|
|
cred_file = project_root / "credentials.toml"
|
|
mcp_file = project_root / "mcp_env.toml"
|
|
|
|
# Preserve GUI layout for tests
|
|
layout_file = Path("manualslop_layout.ini")
|
|
if layout_file.exists():
|
|
shutil.copy2(layout_file, temp_workspace / layout_file.name)
|
|
|
|
# Link assets for fonts
|
|
src_assets = project_root / "assets"
|
|
if src_assets.exists():
|
|
if os.name == 'nt':
|
|
subprocess.run(["cmd", "/c", "mklink", "/D", str(temp_workspace / "assets"), str(src_assets)], check=False)
|
|
else:
|
|
os.symlink(src_assets, temp_workspace / "assets")
|
|
|
|
# Check if already running (shouldn't be). If stale, kill the old process
|
|
# before spawning a new one — otherwise the new subprocess fails to bind
|
|
# port 8999 and the wait loop connects to the stale process instead,
|
|
# leading to state pollution across batches.
|
|
try:
|
|
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
|
|
if resp.status_code == 200:
|
|
print("[Fixture] WARNING: Hook Server already up on port 8999. Killing stale process...")
|
|
netstat = subprocess.run(["netstat", "-ano"], capture_output=True, text=True, timeout=5)
|
|
stale_pids: set[int] = set()
|
|
for line in netstat.stdout.splitlines():
|
|
if ":8999" in line and "LISTENING" in line:
|
|
parts = line.split()
|
|
if parts:
|
|
try: stale_pids.add(int(parts[-1]))
|
|
except ValueError: pass
|
|
for pid in stale_pids:
|
|
try:
|
|
subprocess.run(["taskkill", "/F", "/PID", str(pid)], capture_output=True, timeout=5)
|
|
print(f"[Fixture] Killed stale PID {pid}")
|
|
except Exception: pass
|
|
time.sleep(1.0)
|
|
print("[Fixture] Proceeding with fresh sloppy.py spawn")
|
|
except Exception: pass
|
|
|
|
print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...")
|
|
os.makedirs("logs", exist_ok=True)
|
|
log_file_name = Path(gui_script).name.replace('.', '_')
|
|
log_file = open(f"logs/{log_file_name}_test.log", "w", encoding="utf-8")
|
|
|
|
# Use environment variable to point to temp config if App supports it,
|
|
# or just run from that CWD.
|
|
env = os.environ.copy()
|
|
env["PYTHONPATH"] = str(project_root.absolute())
|
|
if config_file.exists():
|
|
env["SLOP_CONFIG"] = str(config_file.absolute())
|
|
if cred_file.exists():
|
|
env["SLOP_CREDENTIALS"] = str(cred_file.absolute())
|
|
if mcp_file.exists():
|
|
env["SLOP_MCP_ENV"] = str(mcp_file.absolute())
|
|
env["SLOP_GLOBAL_PRESETS"] = str((temp_workspace / "presets.toml").absolute())
|
|
env["SLOP_GLOBAL_TOOL_PRESETS"] = str((temp_workspace / "tool_presets.toml").absolute())
|
|
|
|
process = subprocess.Popen(
|
|
["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"],
|
|
stdout=log_file,
|
|
stderr=log_file,
|
|
text=True,
|
|
cwd=str(temp_workspace.absolute()),
|
|
env=env,
|
|
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0
|
|
)
|
|
|
|
diag.log_state("GUI Process PID", "N/A", process.pid)
|
|
|
|
max_retries = 15
|
|
ready = False
|
|
print(f"[Fixture] Waiting up to {max_retries}s for Hook Server on port 8999...")
|
|
start_time = time.time()
|
|
while time.time() - start_time < max_retries:
|
|
try:
|
|
response = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
|
|
if response.status_code == 200:
|
|
ready = True
|
|
print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.")
|
|
break
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
|
if process.poll() is not None:
|
|
print(f"[Fixture] {gui_script} process died unexpectedly during startup.")
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
diag.log_state("Startup Success", "N/A", str(ready))
|
|
diag.log_state("Startup Time", "N/A", f"{round(time.time() - start_time, 2)}s")
|
|
|
|
if not ready:
|
|
diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.")
|
|
print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.")
|
|
kill_process_tree(process.pid)
|
|
pytest.fail(f"Failed to start {gui_script} with test hooks.")
|
|
|
|
diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.")
|
|
|
|
try:
|
|
yield process, gui_script
|
|
finally:
|
|
print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...")
|
|
# Reset the GUI state before shutting down
|
|
try:
|
|
from src.api_hook_client import ApiHookClient
|
|
client = ApiHookClient()
|
|
client.reset_session()
|
|
time.sleep(0.5)
|
|
except: pass
|
|
|
|
if process.poll() is None:
|
|
kill_process_tree(process.pid)
|
|
# On Windows, taskkill /F /T can leave the Popen object in a state where it still thinks
|
|
# the handle is valid until waited on.
|
|
try:
|
|
process.wait(timeout=2)
|
|
except:
|
|
pass
|
|
|
|
time.sleep(0.5)
|
|
log_file.close()
|
|
# Cleanup temp workspace with retry for Windows file locks
|
|
for _ in range(5):
|
|
try:
|
|
shutil.rmtree(temp_workspace)
|
|
break
|
|
except PermissionError:
|
|
time.sleep(0.5)
|
|
except:
|
|
break |