261 lines
8.8 KiB
Python
261 lines
8.8 KiB
Python
import pytest
|
|
import asyncio
|
|
import subprocess
|
|
import time
|
|
import requests
|
|
import os
|
|
import signal
|
|
import sys
|
|
import datetime
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Generator, Any
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
# Ensure project root is in path for imports
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
|
|
# Import the App class after patching if necessary, but here we just need the type hint
|
|
from gui_2 import App
|
|
|
|
class VerificationLogger:
|
|
def __init__(self, test_name: str, script_name: str) -> None:
|
|
self.test_name = test_name
|
|
self.script_name = script_name
|
|
self.entries = []
|
|
self.start_time = time.time()
|
|
# Route artifacts to tests/logs/
|
|
self.logs_dir = Path(f"tests/logs/{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}")
|
|
self.logs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def log_state(self, field: str, before: Any, after: Any) -> None:
|
|
delta = ""
|
|
if isinstance(before, (int, float)) and isinstance(after, (int, float)):
|
|
diff = after - before
|
|
delta = f"{'+' if diff > 0 else ''}{diff}"
|
|
self.entries.append({
|
|
"Field": field,
|
|
"Before": str(before),
|
|
"After": str(after),
|
|
"Delta": delta
|
|
})
|
|
|
|
def finalize(self, title: str, status: str, result_msg: str) -> None:
|
|
elapsed = round(time.time() - self.start_time, 2)
|
|
log_file = self.logs_dir / f"{self.script_name}.txt"
|
|
with open(log_file, "w", encoding="utf-8") as f:
|
|
f.write(f"[ Test: {self.test_name} ]\n")
|
|
f.write(f"({title})\n\n")
|
|
f.write(f"{self.test_name}: before vs after\n")
|
|
f.write(f"{'Field':<25} {'Before':<20} {'After':<20} {'Delta':<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
for e in self.entries:
|
|
f.write(f"{e['Field']:<25} {e['Before']:<20} {e['After']:<20} {e['Delta']:<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
f.write(f"{status} {self.test_name} ({result_msg})\n\n")
|
|
print(f"[FINAL] {self.test_name}: {status} - {result_msg}")
|
|
|
|
@pytest.fixture
|
|
def vlogger(request) -> VerificationLogger:
|
|
"""Fixture to provide a VerificationLogger instance to a test."""
|
|
test_name = request.node.name
|
|
script_name = Path(request.node.fspath).stem
|
|
return VerificationLogger(test_name, script_name)
|
|
|
|
def kill_process_tree(pid: int | None) -> None:
|
|
"""Robustly kills a process and all its children."""
|
|
if pid is None:
|
|
return
|
|
try:
|
|
print(f"[Fixture] Attempting to kill process tree for PID {pid}...")
|
|
if os.name == 'nt':
|
|
# /F is force, /T is tree (includes children)
|
|
subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
check=False)
|
|
else:
|
|
# On Unix, kill the process group
|
|
os.killpg(os.getpgid(pid), signal.SIGKILL)
|
|
print(f"[Fixture] Process tree {pid} killed.")
|
|
except Exception as e:
|
|
print(f"[Fixture] Error killing process tree {pid}: {e}")
|
|
|
|
@pytest.fixture
|
|
def mock_app() -> Generator[App, None, None]:
|
|
"""
|
|
Mock version of the App for simple unit tests that don't need a loop.
|
|
"""
|
|
with (
|
|
patch('gui_2.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('gui_2.save_config'),
|
|
patch('gui_2.project_manager'),
|
|
patch('gui_2.session_logger'),
|
|
patch('gui_2.immapp.run'),
|
|
patch.object(App, '_load_active_project'),
|
|
patch.object(App, '_fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch.object(App, '_prune_old_logs'),
|
|
patch.object(App, '_init_ai_and_hooks'),
|
|
patch('gui_2.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
if hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
@pytest.fixture
|
|
def app_instance() -> Generator[App, None, None]:
|
|
"""
|
|
Centralized App instance with all external side effects mocked.
|
|
Matches the pattern used in test_token_viz.py and test_gui_phase4.py.
|
|
"""
|
|
with (
|
|
patch('gui_2.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('gui_2.save_config'),
|
|
patch('gui_2.project_manager'),
|
|
patch('gui_2.session_logger'),
|
|
patch('gui_2.immapp.run'),
|
|
patch.object(App, '_load_active_project'),
|
|
patch.object(App, '_fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch.object(App, '_prune_old_logs'),
|
|
patch.object(App, '_init_ai_and_hooks'),
|
|
patch('gui_2.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
# Cleanup: Ensure background threads and asyncio loop are stopped
|
|
if hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
if hasattr(app, '_loop') and not app._loop.is_closed():
|
|
tasks = [t for t in asyncio.all_tasks(app._loop) if not t.done()]
|
|
if tasks:
|
|
# Cancel tasks so they can be gathered
|
|
for task in tasks:
|
|
task.cancel()
|
|
# We can't really run the loop if it's already stopping or thread is dead,
|
|
# but we try to be clean.
|
|
try:
|
|
if app._loop.is_running():
|
|
app._loop.call_soon_threadsafe(app._loop.stop)
|
|
except: pass
|
|
|
|
# Finally close the loop if we can
|
|
try:
|
|
if not app._loop.is_running():
|
|
app._loop.close()
|
|
except: pass
|
|
|
|
@pytest.fixture(scope="session")
|
|
def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
|
|
"""
|
|
Session-scoped fixture that starts gui_2.py with --enable-test-hooks.
|
|
Includes high-signal environment telemetry and workspace isolation.
|
|
"""
|
|
gui_script = os.path.abspath("gui_2.py")
|
|
diag = VerificationLogger("live_gui_startup", "live_gui_diag")
|
|
diag.log_state("GUI Script", "N/A", "gui_2.py")
|
|
|
|
# 1. Create a isolated workspace for the live GUI
|
|
temp_workspace = Path("tests/artifacts/live_gui_workspace")
|
|
if temp_workspace.exists():
|
|
shutil.rmtree(temp_workspace)
|
|
temp_workspace.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create dummy config and project files to avoid cluttering root
|
|
(temp_workspace / "config.toml").write_text("[projects]\npaths = []\nactive = ''\n", encoding="utf-8")
|
|
(temp_workspace / "manual_slop.toml").write_text("[project]\nname = 'TestProject'\n", encoding="utf-8")
|
|
(temp_workspace / "conductor" / "tracks").mkdir(parents=True, exist_ok=True)
|
|
|
|
# Preserve GUI layout for tests
|
|
layout_file = Path("manualslop_layout.ini")
|
|
if layout_file.exists():
|
|
shutil.copy2(layout_file, temp_workspace / layout_file.name)
|
|
|
|
# Check if already running (shouldn't be)
|
|
try:
|
|
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.1)
|
|
already_up = resp.status_code == 200
|
|
except: already_up = False
|
|
diag.log_state("Hook Server Port 8999", "Down", "UP" if already_up else "Down")
|
|
|
|
print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...")
|
|
os.makedirs("logs", exist_ok=True)
|
|
log_file_name = Path(gui_script).name.replace('.', '_')
|
|
log_file = open(f"logs/{log_file_name}_test.log", "w", encoding="utf-8")
|
|
|
|
# Use environment variable to point to temp config if App supports it,
|
|
# or just run from that CWD.
|
|
env = os.environ.copy()
|
|
env["PYTHONPATH"] = os.getcwd()
|
|
|
|
process = subprocess.Popen(
|
|
["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"],
|
|
stdout=log_file,
|
|
stderr=log_file,
|
|
text=True,
|
|
cwd=str(temp_workspace.absolute()),
|
|
env=env,
|
|
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0
|
|
)
|
|
|
|
diag.log_state("GUI Process PID", "N/A", process.pid)
|
|
|
|
max_retries = 15
|
|
ready = False
|
|
print(f"[Fixture] Waiting up to {max_retries}s for Hook Server on port 8999...")
|
|
start_time = time.time()
|
|
while time.time() - start_time < max_retries:
|
|
try:
|
|
response = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
|
|
if response.status_code == 200:
|
|
ready = True
|
|
print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.")
|
|
break
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
|
if process.poll() is not None:
|
|
print(f"[Fixture] {gui_script} process died unexpectedly during startup.")
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
diag.log_state("Startup Success", "N/A", str(ready))
|
|
diag.log_state("Startup Time", "N/A", f"{round(time.time() - start_time, 2)}s")
|
|
|
|
if not ready:
|
|
diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.")
|
|
print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.")
|
|
kill_process_tree(process.pid)
|
|
pytest.fail(f"Failed to start {gui_script} with test hooks.")
|
|
|
|
diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.")
|
|
|
|
try:
|
|
yield process, gui_script
|
|
finally:
|
|
print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...")
|
|
# Reset the GUI state before shutting down
|
|
try:
|
|
from api_hook_client import ApiHookClient
|
|
client = ApiHookClient()
|
|
client.reset_session()
|
|
time.sleep(0.5)
|
|
except: pass
|
|
kill_process_tree(process.pid)
|
|
log_file.close()
|
|
# Cleanup temp workspace
|
|
try:
|
|
shutil.rmtree(temp_workspace)
|
|
except: pass
|