719c5e274a
The conftest watchdog (e1c8730f) used os._exit(0) after the 30s sleep. run_tests_batched.py calls subprocess.run(check=True) and only prints 'Batch N failed.' when the subprocess exits non-zero. Exit 0 hid the failure: pytest got killed mid-test, the FAILURES section never printed, and the runner silently moved to the next batch. The 'Total batches with failures: 1' summary at the end was therefore undercounting.
Fix: os._exit(0) -> os._exit(2). Code 2 is the standard 'interrupted by signal/timeout' code; pytest also uses it for Ctrl-C. The batched runner now correctly reports a non-zero exit as a failure.
Test updated (docstring) to document the new contract. 3/3 test_conftest_watchdog.py still pass.
473 lines
18 KiB
Python
473 lines
18 KiB
Python
import pytest
|
|
import subprocess
|
|
import time
|
|
import requests
|
|
import os
|
|
import signal
|
|
import sys
|
|
import datetime
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Generator, Any
|
|
from unittest.mock import patch
|
|
|
|
thirdparty_dir = os.path.join(os.path.dirname(__file__), "..", "thirdparty")
|
|
if thirdparty_dir not in sys.path:
|
|
sys.path.insert(0, thirdparty_dir)
|
|
|
|
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if project_root not in sys.path:
|
|
sys.path.insert(0, project_root)
|
|
|
|
from defer.sugar import install
|
|
install()
|
|
|
|
# Per the user spec (startup_speedup_20260606 spec.md:2.2 Layer 3,
|
|
# and the message in workflow.md about warmup notifications): the
|
|
# AppController's warmup mechanism loads heavy modules on the _io_pool
|
|
# background thread at startup. Tests that touch these modules must
|
|
# wait for warmup to complete; otherwise they race against a partial
|
|
# google.genai import and hit "partially initialized" errors.
|
|
#
|
|
# Wait for the warmup before any test runs. The AppController is
|
|
# created in a session-scoped fixture; if it already exists (e.g.,
|
|
# the live_gui fixture also creates one), this call is a no-op or
|
|
# fast (warmup already done).
|
|
#
|
|
# HANG PROTECTION: The run_tests_batched.py runner hangs at the end
|
|
# of a batch when the pytest subprocess fails to exit cleanly. Two
|
|
# hang chains have been observed:
|
|
# 1. ThreadPoolExecutor.__del__ -> shutdown(wait=True) joining a
|
|
# blocked worker (concurrent.futures._python_exit, pool __del__,
|
|
# etc.). An earlier atexit fix at commit 8957c9a5 attempted to
|
|
# preempt this; verified empirically that atexit handlers do NOT
|
|
# fire at all when a pool worker is blocked in user code, so the
|
|
# fix is ineffective (see src/io_pool.py module docstring).
|
|
# 2. The session-scoped `live_gui` fixture teardown (conftest.py:~451)
|
|
# hangs in client.reset_session() (HTTP call to the hook server)
|
|
# or kill_process_tree(process.pid) / process.wait(timeout=2)
|
|
# (waiting for the sloppy.py subprocess to die on Windows).
|
|
# Both chains keep the pytest subprocess alive indefinitely, which
|
|
# makes run_tests_batched.py hang at subprocess.run() waiting for the
|
|
# child to exit.
|
|
#
|
|
# Solution: a daemon-thread watchdog that unconditionally calls
|
|
# os._exit(0) after a generous timeout. If pytest exits cleanly
|
|
# first, the thread is killed when the process tears down
|
|
# (daemon=True). If pytest hangs, the watchdog kicks in and the
|
|
# batched runner can move to the next batch. 30s timeout: batches
|
|
# 1-3 in the user's run completed in 1-5s of test execution; 30s
|
|
# leaves headroom for slow batches while bounding the worst-case
|
|
# hang at half a minute. See src/app_controller.py:_install_sigint_exit_handler
|
|
# for the same pattern (SIGINT + os._exit(0)) applied to the
|
|
# production Ctrl+C path.
|
|
import atexit
|
|
from src.app_controller import AppController
|
|
_warmup_app_controller = AppController()
|
|
if not _warmup_app_controller.wait_for_warmup(timeout=60.0):
|
|
import warnings
|
|
warnings.warn(
|
|
"AppController warmup did not complete within 60s. "
|
|
"Tests that depend on warmup modules (google.genai, anthropic, "
|
|
"openai, etc.) may fail.",
|
|
RuntimeWarning,
|
|
stacklevel=2,
|
|
)
|
|
|
|
def _watchdog_exit() -> None:
|
|
import time
|
|
time.sleep(30.0)
|
|
os._exit(2)
|
|
import threading
|
|
threading.Thread(target=_watchdog_exit, daemon=True, name="conftest-hang-watchdog").start()
|
|
|
|
from src.gui_2 import App
|
|
|
|
class VerificationLogger:
|
|
def __init__(self, test_name: str, script_name: str) -> None:
|
|
self.test_name = test_name
|
|
self.script_name = script_name
|
|
self.entries = []
|
|
self.start_time = time.time()
|
|
# Route artifacts to tests/logs/
|
|
self.logs_dir = Path(f"tests/logs/{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}")
|
|
self.logs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def log_state(self, field: str, before: Any, after: Any) -> None:
|
|
"""
|
|
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_vlogger_availability.py:test_vlogger_available]
|
|
"""
|
|
delta = ""
|
|
if isinstance(before, (int, float)) and isinstance(after, (int, float)):
|
|
diff = after - before
|
|
delta = f"{'+' if diff > 0 else ''}{diff}"
|
|
self.entries.append({
|
|
"Field": field,
|
|
"Before": str(before),
|
|
"After": str(after),
|
|
"Delta": delta
|
|
})
|
|
|
|
def finalize(self, title: str, status: str, result_msg: str) -> None:
|
|
"""
|
|
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_end_to_end_tier4_integration, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only, tests/test_vlogger_availability.py:test_vlogger_available]
|
|
"""
|
|
round(time.time() - self.start_time, 2)
|
|
log_file = self.logs_dir / f"{self.script_name}.txt"
|
|
with open(log_file, "w", encoding="utf-8") as f:
|
|
f.write(f"[ Test: {self.test_name} ]\n")
|
|
f.write(f"({title})\n\n")
|
|
f.write(f"{self.test_name}: before vs after\n")
|
|
f.write(f"{'Field':<25} {'Before':<20} {'After':<20} {'Delta':<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
for e in self.entries:
|
|
f.write(f"{e['Field']:<25} {e['Before']:<20} {e['After']:<20} {e['Delta']:<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
f.write(f"{status} {self.test_name} ({result_msg})\n\n")
|
|
print(f"[FINAL] {self.test_name}: {status} - {result_msg}")
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def isolate_workspace(tmp_path_factory, monkeypatch) -> Generator[None, None, None]:
|
|
"""
|
|
Autouse fixture to isolate tests from the active user workspace.
|
|
Protects the real config.toml and manual_slop.toml from being overwritten.
|
|
"""
|
|
test_workspace = tmp_path_factory.mktemp("isolated_workspace")
|
|
|
|
config_path = test_workspace / "config.toml"
|
|
import tomli_w
|
|
with open(config_path, "wb") as f:
|
|
tomli_w.dump({
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}, f)
|
|
|
|
monkeypatch.setenv("SLOP_CONFIG", str(config_path))
|
|
monkeypatch.setenv("SLOP_GLOBAL_PRESETS", str(test_workspace / "presets.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_TOOL_PRESETS", str(test_workspace / "tool_presets.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_PERSONAS", str(test_workspace / "personas.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_WORKSPACE_PROFILES", str(test_workspace / "workspace_profiles.toml"))
|
|
|
|
yield
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_paths() -> Generator[None, None, None]:
|
|
"""
|
|
|
|
|
|
Autouse fixture that resets the paths global state before each test.
|
|
"""
|
|
from src import paths
|
|
paths.reset_resolved()
|
|
yield
|
|
paths.reset_resolved()
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_ai_client() -> Generator[None, None, None]:
|
|
"""
|
|
|
|
|
|
Autouse fixture that resets the ai_client global state before each test.
|
|
This is critical for preventing state pollution between tests.
|
|
"""
|
|
from src import ai_client
|
|
from src import mcp_client
|
|
ai_client.reset_session()
|
|
# Reset callbacks to None or default to ensure no carry-over
|
|
ai_client.confirm_and_run_callback = None
|
|
ai_client.comms_log_callback = None
|
|
ai_client.tool_log_callback = None
|
|
# Clear all event listeners
|
|
ai_client.events.clear()
|
|
# Reset provider/model to defaults
|
|
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
|
# Reset MCP client state
|
|
mcp_client.configure([], [])
|
|
yield
|
|
ai_client.reset_session()
|
|
|
|
@pytest.fixture
|
|
def vlogger(request) -> VerificationLogger:
|
|
"""Fixture to provide a VerificationLogger instance to a test."""
|
|
test_name = request.node.name
|
|
script_name = Path(request.node.fspath).stem
|
|
return VerificationLogger(test_name, script_name)
|
|
|
|
def kill_process_tree(pid: int | None) -> None:
|
|
"""Robustly kills a process and all its children."""
|
|
if pid is None:
|
|
return
|
|
try:
|
|
print(f"[Fixture] Attempting to kill process tree for PID {pid}...")
|
|
if os.name == 'nt':
|
|
# /F is force, /T is tree (includes children)
|
|
subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
check=False)
|
|
else:
|
|
# On Unix, kill the process group
|
|
os.killpg(os.getpgid(pid), signal.SIGKILL)
|
|
print(f"[Fixture] Process tree {pid} killed.")
|
|
except Exception as e:
|
|
print(f"[Fixture] Error killing process tree {pid}: {e}")
|
|
|
|
@pytest.fixture
|
|
def mock_app() -> Generator[App, None, None]:
|
|
"""
|
|
|
|
|
|
Mock version of the App for simple unit tests that don't need a loop.
|
|
"""
|
|
with (
|
|
patch('src.models.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('src.models.save_config'),
|
|
patch('src.gui_2.project_manager'),
|
|
patch('src.gui_2.session_logger'),
|
|
patch('src.gui_2.immapp.run'),
|
|
patch('src.app_controller.AppController._load_active_project'),
|
|
patch('src.app_controller.AppController._fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch('src.app_controller.AppController._prune_old_logs'),
|
|
patch('src.app_controller.AppController.start_services'),
|
|
patch('src.app_controller.AppController._init_ai_and_hooks'),
|
|
patch('src.performance_monitor.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
if hasattr(app, 'controller'):
|
|
app.controller.shutdown()
|
|
elif hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
@pytest.fixture
|
|
def app_instance() -> Generator[App, None, None]:
|
|
"""
|
|
|
|
|
|
Centralized App instance with all external side effects mocked.
|
|
Matches the pattern used in test_token_viz.py and test_gui_phase4.py.
|
|
[C: tests/test_gui2_events.py:test_app_subscribes_to_events]
|
|
"""
|
|
with (
|
|
patch('src.models.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('src.models.save_config'),
|
|
patch('src.gui_2.project_manager'),
|
|
patch('src.gui_2.session_logger'),
|
|
patch('src.gui_2.immapp.run'),
|
|
patch('src.app_controller.AppController._load_active_project'),
|
|
patch('src.app_controller.AppController._fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch('src.app_controller.AppController._prune_old_logs'),
|
|
patch('src.app_controller.AppController.start_services'),
|
|
patch('src.app_controller.AppController._init_ai_and_hooks'),
|
|
patch('src.performance_monitor.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
# Cleanup: Ensure background threads are stopped
|
|
if hasattr(app, 'controller'):
|
|
app.controller.shutdown()
|
|
|
|
if hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
@pytest.fixture(scope="session")
|
|
def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
|
|
"""
|
|
|
|
|
|
Session-scoped fixture that starts sloppy.py with --enable-test-hooks.
|
|
Includes high-signal environment telemetry and workspace isolation.
|
|
"""
|
|
gui_script = os.path.abspath("sloppy.py")
|
|
diag = VerificationLogger("live_gui_startup", "live_gui_diag")
|
|
diag.log_state("GUI Script", "N/A", "gui_2.py")
|
|
|
|
# 1. Create a isolated workspace for the live GUI
|
|
temp_workspace = Path("tests/artifacts/live_gui_workspace")
|
|
if temp_workspace.exists():
|
|
for _ in range(5):
|
|
try:
|
|
shutil.rmtree(temp_workspace)
|
|
break
|
|
except PermissionError:
|
|
time.sleep(0.5)
|
|
# Create the workspace directory before writing files
|
|
temp_workspace.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create minimal project files to avoid cluttering root
|
|
(temp_workspace / "manual_slop.toml").write_text("[project]\nname = 'TestProject'\n\n[conductor]\ndir = 'conductor'\n", encoding="utf-8")
|
|
(temp_workspace / "conductor" / "tracks").mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create a local config.toml in temp_workspace
|
|
config_content = {
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {
|
|
'paths': [str((temp_workspace / 'manual_slop.toml').absolute())],
|
|
'active': str((temp_workspace / 'manual_slop.toml').absolute())
|
|
},
|
|
'paths': {
|
|
'logs_dir': str((temp_workspace / "logs").absolute()),
|
|
'scripts_dir': str((temp_workspace / "scripts" / "generated").absolute())
|
|
},
|
|
'tools': {
|
|
'text_editors': {
|
|
'vscode': {
|
|
'path': 'C:\\apps\\Microsoft VS Code\\Code.exe',
|
|
'diff_args': ['--new-window', '--diff']
|
|
}
|
|
},
|
|
'default_editor': {'default_editor': 'vscode'}
|
|
}
|
|
}
|
|
import tomli_w
|
|
with open(temp_workspace / 'config.toml', 'wb') as f:
|
|
tomli_w.dump(config_content, f)
|
|
|
|
# Resolve absolute paths for shared resources
|
|
project_root = Path(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
config_file = temp_workspace / "config.toml"
|
|
|
|
cred_file = project_root / "credentials.toml"
|
|
mcp_file = project_root / "mcp_env.toml"
|
|
|
|
# Preserve GUI layout for tests
|
|
layout_file = Path("manualslop_layout.ini")
|
|
if layout_file.exists():
|
|
shutil.copy2(layout_file, temp_workspace / layout_file.name)
|
|
|
|
# Link assets for fonts
|
|
src_assets = project_root / "assets"
|
|
if src_assets.exists():
|
|
if os.name == 'nt':
|
|
subprocess.run(["cmd", "/c", "mklink", "/D", str(temp_workspace / "assets"), str(src_assets)], check=False)
|
|
else:
|
|
os.symlink(src_assets, temp_workspace / "assets")
|
|
|
|
# Check if already running (shouldn't be). If stale, kill the old process
|
|
# before spawning a new one — otherwise the new subprocess fails to bind
|
|
# port 8999 and the wait loop connects to the stale process instead,
|
|
# leading to state pollution across batches.
|
|
try:
|
|
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
|
|
if resp.status_code == 200:
|
|
print("[Fixture] WARNING: Hook Server already up on port 8999. Killing stale process...")
|
|
netstat = subprocess.run(["netstat", "-ano"], capture_output=True, text=True, timeout=5)
|
|
stale_pids: set[int] = set()
|
|
for line in netstat.stdout.splitlines():
|
|
if ":8999" in line and "LISTENING" in line:
|
|
parts = line.split()
|
|
if parts:
|
|
try: stale_pids.add(int(parts[-1]))
|
|
except ValueError: pass
|
|
for pid in stale_pids:
|
|
try:
|
|
subprocess.run(["taskkill", "/F", "/PID", str(pid)], capture_output=True, timeout=5)
|
|
print(f"[Fixture] Killed stale PID {pid}")
|
|
except Exception: pass
|
|
time.sleep(1.0)
|
|
print("[Fixture] Proceeding with fresh sloppy.py spawn")
|
|
except Exception: pass
|
|
|
|
print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...")
|
|
os.makedirs("logs", exist_ok=True)
|
|
log_file_name = Path(gui_script).name.replace('.', '_')
|
|
log_file = open(f"logs/{log_file_name}_test.log", "w", encoding="utf-8")
|
|
|
|
# Use environment variable to point to temp config if App supports it,
|
|
# or just run from that CWD.
|
|
env = os.environ.copy()
|
|
env["PYTHONPATH"] = str(project_root.absolute())
|
|
if config_file.exists():
|
|
env["SLOP_CONFIG"] = str(config_file.absolute())
|
|
if cred_file.exists():
|
|
env["SLOP_CREDENTIALS"] = str(cred_file.absolute())
|
|
if mcp_file.exists():
|
|
env["SLOP_MCP_ENV"] = str(mcp_file.absolute())
|
|
env["SLOP_GLOBAL_PRESETS"] = str((temp_workspace / "presets.toml").absolute())
|
|
env["SLOP_GLOBAL_TOOL_PRESETS"] = str((temp_workspace / "tool_presets.toml").absolute())
|
|
|
|
process = subprocess.Popen(
|
|
["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"],
|
|
stdout=log_file,
|
|
stderr=log_file,
|
|
text=True,
|
|
cwd=str(temp_workspace.absolute()),
|
|
env=env,
|
|
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0
|
|
)
|
|
|
|
diag.log_state("GUI Process PID", "N/A", process.pid)
|
|
|
|
max_retries = 15
|
|
ready = False
|
|
print(f"[Fixture] Waiting up to {max_retries}s for Hook Server on port 8999...")
|
|
start_time = time.time()
|
|
while time.time() - start_time < max_retries:
|
|
try:
|
|
response = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
|
|
if response.status_code == 200:
|
|
ready = True
|
|
print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.")
|
|
break
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
|
if process.poll() is not None:
|
|
print(f"[Fixture] {gui_script} process died unexpectedly during startup.")
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
diag.log_state("Startup Success", "N/A", str(ready))
|
|
diag.log_state("Startup Time", "N/A", f"{round(time.time() - start_time, 2)}s")
|
|
|
|
if not ready:
|
|
diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.")
|
|
print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.")
|
|
kill_process_tree(process.pid)
|
|
pytest.fail(f"Failed to start {gui_script} with test hooks.")
|
|
|
|
diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.")
|
|
|
|
try:
|
|
yield process, gui_script
|
|
finally:
|
|
print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...")
|
|
# Reset the GUI state before shutting down
|
|
try:
|
|
from src.api_hook_client import ApiHookClient
|
|
client = ApiHookClient()
|
|
client.reset_session()
|
|
time.sleep(0.5)
|
|
except: pass
|
|
|
|
if process.poll() is None:
|
|
kill_process_tree(process.pid)
|
|
# On Windows, taskkill /F /T can leave the Popen object in a state where it still thinks
|
|
# the handle is valid until waited on.
|
|
try:
|
|
process.wait(timeout=2)
|
|
except:
|
|
pass
|
|
|
|
time.sleep(0.5)
|
|
log_file.close()
|
|
# Cleanup temp workspace with retry for Windows file locks
|
|
for _ in range(5):
|
|
try:
|
|
shutil.rmtree(temp_workspace)
|
|
break
|
|
except PermissionError:
|
|
time.sleep(0.5)
|
|
except:
|
|
break |