Private
Public Access
0
0
Files
manual_slop/tests/conftest.py
T
ed 8957c9a5be fix(conftest): register atexit handler for non-blocking pool shutdown
Fixes the run_tests_batched.py hang that occurs after batch 4.
The original conftest (commit 52ea2693) stored _warmup_app_controller
at module scope for the entire pytest session. When pytest exits,
GC of the AppController triggers ThreadPoolExecutor.__del__ ->
shutdown(wait=True). If warmup hasn't fully completed by then, the
shutdown blocks indefinitely, causing the batched test runner to
hang at the subprocess.run boundary.

Fix: register an atexit handler that captures the _io_pool reference
directly (default argument) and shuts it down with wait=False. The
pool reference is captured by closure, surviving even after the
AppController is GC'd. shutdown() is idempotent so the subsequent
shutdown(wait=True) in __del__ is a no-op.

This is part of sub-track 4 (warmup notification) cleanup; the
conftest's wait_for_warmup behavior is preserved, only the
exit-hang is fixed.
2026-06-06 21:35:05 -04:00

442 lines
16 KiB
Python

import pytest
import subprocess
import time
import requests
import os
import signal
import sys
import datetime
import shutil
from pathlib import Path
from typing import Generator, Any
from unittest.mock import patch
thirdparty_dir = os.path.join(os.path.dirname(__file__), "..", "thirdparty")
if thirdparty_dir not in sys.path:
sys.path.insert(0, thirdparty_dir)
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from defer.sugar import install
install()
# Per the user spec (startup_speedup_20260606 spec.md:2.2 Layer 3,
# and the message in workflow.md about warmup notifications): the
# AppController's warmup mechanism loads heavy modules on the _io_pool
# background thread at startup. Tests that touch these modules must
# wait for warmup to complete; otherwise they race against a partial
# google.genai import and hit "partially initialized" errors.
#
# Wait for the warmup before any test runs. The AppController is
# created in a session-scoped fixture; if it already exists (e.g.,
# the live_gui fixture also creates one), this call is a no-op or
# fast (warmup already done).
#
# FIX (startup_speedup_20260606 sub-track 4 follow-up): The original
# code held `_warmup_app_controller` at module scope for the entire
# pytest session. When pytest exits, GC of the AppController triggers
# ThreadPoolExecutor.__del__ -> shutdown(wait=True). If warmup hasn't
# fully completed, shutdown blocks indefinitely, causing the batched
# test runner to hang after pytest exits.
#
# Fix: register an atexit handler that captures the pool reference
# directly (not the AppController) and shuts it down with wait=False.
# shutdown() is idempotent, so the subsequent shutdown(wait=True) in
# __del__ is a no-op. The pool reference is captured by closure so it
# survives even after the AppController is GC'd.
import atexit
from src.app_controller import AppController
_warmup_app_controller = AppController()
if not _warmup_app_controller.wait_for_warmup(timeout=60.0):
import warnings
warnings.warn(
"AppController warmup did not complete within 60s. "
"Tests that depend on warmup modules (google.genai, anthropic, "
"openai, etc.) may fail.",
RuntimeWarning,
stacklevel=2,
)
_warmup_io_pool = getattr(_warmup_app_controller, "_io_pool", None)
def _shutdown_warmup_pool(pool: object = _warmup_io_pool) -> None:
if pool is not None:
try: pool.shutdown(wait=False)
except Exception: pass
atexit.register(_shutdown_warmup_pool)
from src.gui_2 import App
class VerificationLogger:
def __init__(self, test_name: str, script_name: str) -> None:
self.test_name = test_name
self.script_name = script_name
self.entries = []
self.start_time = time.time()
# Route artifacts to tests/logs/
self.logs_dir = Path(f"tests/logs/{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}")
self.logs_dir.mkdir(parents=True, exist_ok=True)
def log_state(self, field: str, before: Any, after: Any) -> None:
"""
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_vlogger_availability.py:test_vlogger_available]
"""
delta = ""
if isinstance(before, (int, float)) and isinstance(after, (int, float)):
diff = after - before
delta = f"{'+' if diff > 0 else ''}{diff}"
self.entries.append({
"Field": field,
"Before": str(before),
"After": str(after),
"Delta": delta
})
def finalize(self, title: str, status: str, result_msg: str) -> None:
"""
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_end_to_end_tier4_integration, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only, tests/test_vlogger_availability.py:test_vlogger_available]
"""
round(time.time() - self.start_time, 2)
log_file = self.logs_dir / f"{self.script_name}.txt"
with open(log_file, "w", encoding="utf-8") as f:
f.write(f"[ Test: {self.test_name} ]\n")
f.write(f"({title})\n\n")
f.write(f"{self.test_name}: before vs after\n")
f.write(f"{'Field':<25} {'Before':<20} {'After':<20} {'Delta':<15}\n")
f.write("-" * 80 + "\n")
for e in self.entries:
f.write(f"{e['Field']:<25} {e['Before']:<20} {e['After']:<20} {e['Delta']:<15}\n")
f.write("-" * 80 + "\n")
f.write(f"{status} {self.test_name} ({result_msg})\n\n")
print(f"[FINAL] {self.test_name}: {status} - {result_msg}")
@pytest.fixture(autouse=True)
def isolate_workspace(tmp_path_factory, monkeypatch) -> Generator[None, None, None]:
"""
Autouse fixture to isolate tests from the active user workspace.
Protects the real config.toml and manual_slop.toml from being overwritten.
"""
test_workspace = tmp_path_factory.mktemp("isolated_workspace")
config_path = test_workspace / "config.toml"
import tomli_w
with open(config_path, "wb") as f:
tomli_w.dump({
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
'projects': {'paths': [], 'active': ''},
'gui': {'show_windows': {}}
}, f)
monkeypatch.setenv("SLOP_CONFIG", str(config_path))
monkeypatch.setenv("SLOP_GLOBAL_PRESETS", str(test_workspace / "presets.toml"))
monkeypatch.setenv("SLOP_GLOBAL_TOOL_PRESETS", str(test_workspace / "tool_presets.toml"))
monkeypatch.setenv("SLOP_GLOBAL_PERSONAS", str(test_workspace / "personas.toml"))
monkeypatch.setenv("SLOP_GLOBAL_WORKSPACE_PROFILES", str(test_workspace / "workspace_profiles.toml"))
yield
@pytest.fixture(autouse=True)
def reset_paths() -> Generator[None, None, None]:
"""
Autouse fixture that resets the paths global state before each test.
"""
from src import paths
paths.reset_resolved()
yield
paths.reset_resolved()
@pytest.fixture(autouse=True)
def reset_ai_client() -> Generator[None, None, None]:
"""
Autouse fixture that resets the ai_client global state before each test.
This is critical for preventing state pollution between tests.
"""
from src import ai_client
from src import mcp_client
ai_client.reset_session()
# Reset callbacks to None or default to ensure no carry-over
ai_client.confirm_and_run_callback = None
ai_client.comms_log_callback = None
ai_client.tool_log_callback = None
# Clear all event listeners
ai_client.events.clear()
# Reset provider/model to defaults
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
# Reset MCP client state
mcp_client.configure([], [])
yield
ai_client.reset_session()
@pytest.fixture
def vlogger(request) -> VerificationLogger:
"""Fixture to provide a VerificationLogger instance to a test."""
test_name = request.node.name
script_name = Path(request.node.fspath).stem
return VerificationLogger(test_name, script_name)
def kill_process_tree(pid: int | None) -> None:
"""Robustly kills a process and all its children."""
if pid is None:
return
try:
print(f"[Fixture] Attempting to kill process tree for PID {pid}...")
if os.name == 'nt':
# /F is force, /T is tree (includes children)
subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False)
else:
# On Unix, kill the process group
os.killpg(os.getpgid(pid), signal.SIGKILL)
print(f"[Fixture] Process tree {pid} killed.")
except Exception as e:
print(f"[Fixture] Error killing process tree {pid}: {e}")
@pytest.fixture
def mock_app() -> Generator[App, None, None]:
"""
Mock version of the App for simple unit tests that don't need a loop.
"""
with (
patch('src.models.load_config', return_value={
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
'projects': {'paths': [], 'active': ''},
'gui': {'show_windows': {}}
}),
patch('src.models.save_config'),
patch('src.gui_2.project_manager'),
patch('src.gui_2.session_logger'),
patch('src.gui_2.immapp.run'),
patch('src.app_controller.AppController._load_active_project'),
patch('src.app_controller.AppController._fetch_models'),
patch.object(App, '_load_fonts'),
patch.object(App, '_post_init'),
patch('src.app_controller.AppController._prune_old_logs'),
patch('src.app_controller.AppController.start_services'),
patch('src.app_controller.AppController._init_ai_and_hooks'),
patch('src.performance_monitor.PerformanceMonitor')
):
app = App()
yield app
if hasattr(app, 'controller'):
app.controller.shutdown()
elif hasattr(app, 'shutdown'):
app.shutdown()
@pytest.fixture
def app_instance() -> Generator[App, None, None]:
"""
Centralized App instance with all external side effects mocked.
Matches the pattern used in test_token_viz.py and test_gui_phase4.py.
[C: tests/test_gui2_events.py:test_app_subscribes_to_events]
"""
with (
patch('src.models.load_config', return_value={
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
'projects': {'paths': [], 'active': ''},
'gui': {'show_windows': {}}
}),
patch('src.models.save_config'),
patch('src.gui_2.project_manager'),
patch('src.gui_2.session_logger'),
patch('src.gui_2.immapp.run'),
patch('src.app_controller.AppController._load_active_project'),
patch('src.app_controller.AppController._fetch_models'),
patch.object(App, '_load_fonts'),
patch.object(App, '_post_init'),
patch('src.app_controller.AppController._prune_old_logs'),
patch('src.app_controller.AppController.start_services'),
patch('src.app_controller.AppController._init_ai_and_hooks'),
patch('src.performance_monitor.PerformanceMonitor')
):
app = App()
yield app
# Cleanup: Ensure background threads are stopped
if hasattr(app, 'controller'):
app.controller.shutdown()
if hasattr(app, 'shutdown'):
app.shutdown()
@pytest.fixture(scope="session")
def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
"""
Session-scoped fixture that starts sloppy.py with --enable-test-hooks.
Includes high-signal environment telemetry and workspace isolation.
"""
gui_script = os.path.abspath("sloppy.py")
diag = VerificationLogger("live_gui_startup", "live_gui_diag")
diag.log_state("GUI Script", "N/A", "gui_2.py")
# 1. Create a isolated workspace for the live GUI
temp_workspace = Path("tests/artifacts/live_gui_workspace")
if temp_workspace.exists():
for _ in range(5):
try:
shutil.rmtree(temp_workspace)
break
except PermissionError:
time.sleep(0.5)
# Create the workspace directory before writing files
temp_workspace.mkdir(parents=True, exist_ok=True)
# Create minimal project files to avoid cluttering root
(temp_workspace / "manual_slop.toml").write_text("[project]\nname = 'TestProject'\n\n[conductor]\ndir = 'conductor'\n", encoding="utf-8")
(temp_workspace / "conductor" / "tracks").mkdir(parents=True, exist_ok=True)
# Create a local config.toml in temp_workspace
config_content = {
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
'projects': {
'paths': [str((temp_workspace / 'manual_slop.toml').absolute())],
'active': str((temp_workspace / 'manual_slop.toml').absolute())
},
'paths': {
'logs_dir': str((temp_workspace / "logs").absolute()),
'scripts_dir': str((temp_workspace / "scripts" / "generated").absolute())
},
'tools': {
'text_editors': {
'vscode': {
'path': 'C:\\apps\\Microsoft VS Code\\Code.exe',
'diff_args': ['--new-window', '--diff']
}
},
'default_editor': {'default_editor': 'vscode'}
}
}
import tomli_w
with open(temp_workspace / 'config.toml', 'wb') as f:
tomli_w.dump(config_content, f)
# Resolve absolute paths for shared resources
project_root = Path(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
config_file = temp_workspace / "config.toml"
cred_file = project_root / "credentials.toml"
mcp_file = project_root / "mcp_env.toml"
# Preserve GUI layout for tests
layout_file = Path("manualslop_layout.ini")
if layout_file.exists():
shutil.copy2(layout_file, temp_workspace / layout_file.name)
# Link assets for fonts
src_assets = project_root / "assets"
if src_assets.exists():
if os.name == 'nt':
subprocess.run(["cmd", "/c", "mklink", "/D", str(temp_workspace / "assets"), str(src_assets)], check=False)
else:
os.symlink(src_assets, temp_workspace / "assets")
# Check if already running (shouldn't be)
try:
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
if resp.status_code == 200:
print("[Fixture] WARNING: Hook Server already up on port 8999. Test state might be polluted.")
# Optionally try to reset it
try: requests.post("http://127.0.0.1:8999/api/gui", json={"action": "click", "item": "btn_reset"}, timeout=1)
except: pass
except: pass
print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...")
os.makedirs("logs", exist_ok=True)
log_file_name = Path(gui_script).name.replace('.', '_')
log_file = open(f"logs/{log_file_name}_test.log", "w", encoding="utf-8")
# Use environment variable to point to temp config if App supports it,
# or just run from that CWD.
env = os.environ.copy()
env["PYTHONPATH"] = str(project_root.absolute())
if config_file.exists():
env["SLOP_CONFIG"] = str(config_file.absolute())
if cred_file.exists():
env["SLOP_CREDENTIALS"] = str(cred_file.absolute())
if mcp_file.exists():
env["SLOP_MCP_ENV"] = str(mcp_file.absolute())
env["SLOP_GLOBAL_PRESETS"] = str((temp_workspace / "presets.toml").absolute())
env["SLOP_GLOBAL_TOOL_PRESETS"] = str((temp_workspace / "tool_presets.toml").absolute())
process = subprocess.Popen(
["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"],
stdout=log_file,
stderr=log_file,
text=True,
cwd=str(temp_workspace.absolute()),
env=env,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0
)
diag.log_state("GUI Process PID", "N/A", process.pid)
max_retries = 15
ready = False
print(f"[Fixture] Waiting up to {max_retries}s for Hook Server on port 8999...")
start_time = time.time()
while time.time() - start_time < max_retries:
try:
response = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
if response.status_code == 200:
ready = True
print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.")
break
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
if process.poll() is not None:
print(f"[Fixture] {gui_script} process died unexpectedly during startup.")
break
time.sleep(0.5)
diag.log_state("Startup Success", "N/A", str(ready))
diag.log_state("Startup Time", "N/A", f"{round(time.time() - start_time, 2)}s")
if not ready:
diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.")
print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.")
kill_process_tree(process.pid)
pytest.fail(f"Failed to start {gui_script} with test hooks.")
diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.")
try:
yield process, gui_script
finally:
print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...")
# Reset the GUI state before shutting down
try:
from src.api_hook_client import ApiHookClient
client = ApiHookClient()
client.reset_session()
time.sleep(0.5)
except: pass
if process.poll() is None:
kill_process_tree(process.pid)
# On Windows, taskkill /F /T can leave the Popen object in a state where it still thinks
# the handle is valid until waited on.
try:
process.wait(timeout=2)
except:
pass
time.sleep(0.5)
log_file.close()
# Cleanup temp workspace with retry for Windows file locks
for _ in range(5):
try:
shutil.rmtree(temp_workspace)
break
except PermissionError:
time.sleep(0.5)
except:
break