955b61df78
The os._exit(2) change in 719c5e27 introduced a regression: the watchdog's daemon thread continues running through pytest's interpreter shutdown. On EVERY batch (even ones that complete successfully in 17s), the watchdog's time.sleep(30.0) elapses during finalization and the thread calls os._exit(2) just as pytest is wrapping up. Result: every batch was reported as 'Batch N failed' by run_tests_batched.py, even ones with '126 passed in 17.14s'.
Revert watchdog to os._exit(0) — its original purpose (force-exit any stuck pytest at 30s) doesn't need a non-zero code; it's a sledgehammer, not a signal. The runner does its own failure detection.
Update scripts/run_tests_batched.py to:
- Use subprocess.run(timeout=180) per batch
- Catch TimeoutExpired as a batch failure (with elapsed time + reason printed)
- Catch CalledProcessError as a batch failure (preserved from before)
- Print elapsed time for every batch (pass or fail) so hang behavior is visible
- Print a final summary that lists all FAILED FILES (not batches) for easy re-running
- Add --batch-size and --timeout CLI flags
- Add 1-space indentation + type hints per project style
Verified: ast.parse OK; --help works; test_conftest_watchdog 3/3 pass.
473 lines
18 KiB
Python
473 lines
18 KiB
Python
import pytest
|
|
import subprocess
|
|
import time
|
|
import requests
|
|
import os
|
|
import signal
|
|
import sys
|
|
import datetime
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Generator, Any
|
|
from unittest.mock import patch
|
|
|
|
thirdparty_dir = os.path.join(os.path.dirname(__file__), "..", "thirdparty")
|
|
if thirdparty_dir not in sys.path:
|
|
sys.path.insert(0, thirdparty_dir)
|
|
|
|
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if project_root not in sys.path:
|
|
sys.path.insert(0, project_root)
|
|
|
|
from defer.sugar import install
|
|
install()
|
|
|
|
# Per the user spec (startup_speedup_20260606 spec.md:2.2 Layer 3,
|
|
# and the message in workflow.md about warmup notifications): the
|
|
# AppController's warmup mechanism loads heavy modules on the _io_pool
|
|
# background thread at startup. Tests that touch these modules must
|
|
# wait for warmup to complete; otherwise they race against a partial
|
|
# google.genai import and hit "partially initialized" errors.
|
|
#
|
|
# Wait for the warmup before any test runs. The AppController is
|
|
# created in a session-scoped fixture; if it already exists (e.g.,
|
|
# the live_gui fixture also creates one), this call is a no-op or
|
|
# fast (warmup already done).
|
|
#
|
|
# HANG PROTECTION: The run_tests_batched.py runner hangs at the end
|
|
# of a batch when the pytest subprocess fails to exit cleanly. Two
|
|
# hang chains have been observed:
|
|
# 1. ThreadPoolExecutor.__del__ -> shutdown(wait=True) joining a
|
|
# blocked worker (concurrent.futures._python_exit, pool __del__,
|
|
# etc.). An earlier atexit fix at commit 8957c9a5 attempted to
|
|
# preempt this; verified empirically that atexit handlers do NOT
|
|
# fire at all when a pool worker is blocked in user code, so the
|
|
# fix is ineffective (see src/io_pool.py module docstring).
|
|
# 2. The session-scoped `live_gui` fixture teardown (conftest.py:~451)
|
|
# hangs in client.reset_session() (HTTP call to the hook server)
|
|
# or kill_process_tree(process.pid) / process.wait(timeout=2)
|
|
# (waiting for the sloppy.py subprocess to die on Windows).
|
|
# Both chains keep the pytest subprocess alive indefinitely, which
|
|
# makes run_tests_batched.py hang at subprocess.run() waiting for the
|
|
# child to exit.
|
|
#
|
|
# Solution: a daemon-thread watchdog that unconditionally calls
|
|
# os._exit(0) after a generous timeout. If pytest exits cleanly
|
|
# first, the thread is killed when the process tears down
|
|
# (daemon=True). If pytest hangs, the watchdog kicks in and the
|
|
# batched runner can move to the next batch. 30s timeout: batches
|
|
# 1-3 in the user's run completed in 1-5s of test execution; 30s
|
|
# leaves headroom for slow batches while bounding the worst-case
|
|
# hang at half a minute. See src/app_controller.py:_install_sigint_exit_handler
|
|
# for the same pattern (SIGINT + os._exit(0)) applied to the
|
|
# production Ctrl+C path.
|
|
import atexit
|
|
from src.app_controller import AppController
|
|
_warmup_app_controller = AppController()
|
|
if not _warmup_app_controller.wait_for_warmup(timeout=60.0):
|
|
import warnings
|
|
warnings.warn(
|
|
"AppController warmup did not complete within 60s. "
|
|
"Tests that depend on warmup modules (google.genai, anthropic, "
|
|
"openai, etc.) may fail.",
|
|
RuntimeWarning,
|
|
stacklevel=2,
|
|
)
|
|
|
|
def _watchdog_exit() -> None:
|
|
import time
|
|
time.sleep(30.0)
|
|
os._exit(0)
|
|
import threading
|
|
threading.Thread(target=_watchdog_exit, daemon=True, name="conftest-hang-watchdog").start()
|
|
|
|
from src.gui_2 import App
|
|
|
|
class VerificationLogger:
|
|
def __init__(self, test_name: str, script_name: str) -> None:
|
|
self.test_name = test_name
|
|
self.script_name = script_name
|
|
self.entries = []
|
|
self.start_time = time.time()
|
|
# Route artifacts to tests/logs/
|
|
self.logs_dir = Path(f"tests/logs/{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}")
|
|
self.logs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def log_state(self, field: str, before: Any, after: Any) -> None:
|
|
"""
|
|
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_vlogger_availability.py:test_vlogger_available]
|
|
"""
|
|
delta = ""
|
|
if isinstance(before, (int, float)) and isinstance(after, (int, float)):
|
|
diff = after - before
|
|
delta = f"{'+' if diff > 0 else ''}{diff}"
|
|
self.entries.append({
|
|
"Field": field,
|
|
"Before": str(before),
|
|
"After": str(after),
|
|
"Delta": delta
|
|
})
|
|
|
|
def finalize(self, title: str, status: str, result_msg: str) -> None:
|
|
"""
|
|
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_end_to_end_tier4_integration, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only, tests/test_vlogger_availability.py:test_vlogger_available]
|
|
"""
|
|
round(time.time() - self.start_time, 2)
|
|
log_file = self.logs_dir / f"{self.script_name}.txt"
|
|
with open(log_file, "w", encoding="utf-8") as f:
|
|
f.write(f"[ Test: {self.test_name} ]\n")
|
|
f.write(f"({title})\n\n")
|
|
f.write(f"{self.test_name}: before vs after\n")
|
|
f.write(f"{'Field':<25} {'Before':<20} {'After':<20} {'Delta':<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
for e in self.entries:
|
|
f.write(f"{e['Field']:<25} {e['Before']:<20} {e['After']:<20} {e['Delta']:<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
f.write(f"{status} {self.test_name} ({result_msg})\n\n")
|
|
print(f"[FINAL] {self.test_name}: {status} - {result_msg}")
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def isolate_workspace(tmp_path_factory, monkeypatch) -> Generator[None, None, None]:
|
|
"""
|
|
Autouse fixture to isolate tests from the active user workspace.
|
|
Protects the real config.toml and manual_slop.toml from being overwritten.
|
|
"""
|
|
test_workspace = tmp_path_factory.mktemp("isolated_workspace")
|
|
|
|
config_path = test_workspace / "config.toml"
|
|
import tomli_w
|
|
with open(config_path, "wb") as f:
|
|
tomli_w.dump({
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}, f)
|
|
|
|
monkeypatch.setenv("SLOP_CONFIG", str(config_path))
|
|
monkeypatch.setenv("SLOP_GLOBAL_PRESETS", str(test_workspace / "presets.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_TOOL_PRESETS", str(test_workspace / "tool_presets.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_PERSONAS", str(test_workspace / "personas.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_WORKSPACE_PROFILES", str(test_workspace / "workspace_profiles.toml"))
|
|
|
|
yield
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_paths() -> Generator[None, None, None]:
|
|
"""
|
|
|
|
|
|
Autouse fixture that resets the paths global state before each test.
|
|
"""
|
|
from src import paths
|
|
paths.reset_resolved()
|
|
yield
|
|
paths.reset_resolved()
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_ai_client() -> Generator[None, None, None]:
|
|
"""
|
|
|
|
|
|
Autouse fixture that resets the ai_client global state before each test.
|
|
This is critical for preventing state pollution between tests.
|
|
"""
|
|
from src import ai_client
|
|
from src import mcp_client
|
|
ai_client.reset_session()
|
|
# Reset callbacks to None or default to ensure no carry-over
|
|
ai_client.confirm_and_run_callback = None
|
|
ai_client.comms_log_callback = None
|
|
ai_client.tool_log_callback = None
|
|
# Clear all event listeners
|
|
ai_client.events.clear()
|
|
# Reset provider/model to defaults
|
|
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
|
# Reset MCP client state
|
|
mcp_client.configure([], [])
|
|
yield
|
|
ai_client.reset_session()
|
|
|
|
@pytest.fixture
|
|
def vlogger(request) -> VerificationLogger:
|
|
"""Fixture to provide a VerificationLogger instance to a test."""
|
|
test_name = request.node.name
|
|
script_name = Path(request.node.fspath).stem
|
|
return VerificationLogger(test_name, script_name)
|
|
|
|
def kill_process_tree(pid: int | None) -> None:
|
|
"""Robustly kills a process and all its children."""
|
|
if pid is None:
|
|
return
|
|
try:
|
|
print(f"[Fixture] Attempting to kill process tree for PID {pid}...")
|
|
if os.name == 'nt':
|
|
# /F is force, /T is tree (includes children)
|
|
subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
check=False)
|
|
else:
|
|
# On Unix, kill the process group
|
|
os.killpg(os.getpgid(pid), signal.SIGKILL)
|
|
print(f"[Fixture] Process tree {pid} killed.")
|
|
except Exception as e:
|
|
print(f"[Fixture] Error killing process tree {pid}: {e}")
|
|
|
|
@pytest.fixture
|
|
def mock_app() -> Generator[App, None, None]:
|
|
"""
|
|
|
|
|
|
Mock version of the App for simple unit tests that don't need a loop.
|
|
"""
|
|
with (
|
|
patch('src.models.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('src.models.save_config'),
|
|
patch('src.gui_2.project_manager'),
|
|
patch('src.gui_2.session_logger'),
|
|
patch('src.gui_2.immapp.run'),
|
|
patch('src.app_controller.AppController._load_active_project'),
|
|
patch('src.app_controller.AppController._fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch('src.app_controller.AppController._prune_old_logs'),
|
|
patch('src.app_controller.AppController.start_services'),
|
|
patch('src.app_controller.AppController._init_ai_and_hooks'),
|
|
patch('src.performance_monitor.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
if hasattr(app, 'controller'):
|
|
app.controller.shutdown()
|
|
elif hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
@pytest.fixture
|
|
def app_instance() -> Generator[App, None, None]:
|
|
"""
|
|
|
|
|
|
Centralized App instance with all external side effects mocked.
|
|
Matches the pattern used in test_token_viz.py and test_gui_phase4.py.
|
|
[C: tests/test_gui2_events.py:test_app_subscribes_to_events]
|
|
"""
|
|
with (
|
|
patch('src.models.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('src.models.save_config'),
|
|
patch('src.gui_2.project_manager'),
|
|
patch('src.gui_2.session_logger'),
|
|
patch('src.gui_2.immapp.run'),
|
|
patch('src.app_controller.AppController._load_active_project'),
|
|
patch('src.app_controller.AppController._fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch('src.app_controller.AppController._prune_old_logs'),
|
|
patch('src.app_controller.AppController.start_services'),
|
|
patch('src.app_controller.AppController._init_ai_and_hooks'),
|
|
patch('src.performance_monitor.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
# Cleanup: Ensure background threads are stopped
|
|
if hasattr(app, 'controller'):
|
|
app.controller.shutdown()
|
|
|
|
if hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
@pytest.fixture(scope="session")
|
|
def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
|
|
"""
|
|
|
|
|
|
Session-scoped fixture that starts sloppy.py with --enable-test-hooks.
|
|
Includes high-signal environment telemetry and workspace isolation.
|
|
"""
|
|
gui_script = os.path.abspath("sloppy.py")
|
|
diag = VerificationLogger("live_gui_startup", "live_gui_diag")
|
|
diag.log_state("GUI Script", "N/A", "gui_2.py")
|
|
|
|
# 1. Create a isolated workspace for the live GUI
|
|
temp_workspace = Path("tests/artifacts/live_gui_workspace")
|
|
if temp_workspace.exists():
|
|
for _ in range(5):
|
|
try:
|
|
shutil.rmtree(temp_workspace)
|
|
break
|
|
except PermissionError:
|
|
time.sleep(0.5)
|
|
# Create the workspace directory before writing files
|
|
temp_workspace.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create minimal project files to avoid cluttering root
|
|
(temp_workspace / "manual_slop.toml").write_text("[project]\nname = 'TestProject'\n\n[conductor]\ndir = 'conductor'\n", encoding="utf-8")
|
|
(temp_workspace / "conductor" / "tracks").mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create a local config.toml in temp_workspace
|
|
config_content = {
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {
|
|
'paths': [str((temp_workspace / 'manual_slop.toml').absolute())],
|
|
'active': str((temp_workspace / 'manual_slop.toml').absolute())
|
|
},
|
|
'paths': {
|
|
'logs_dir': str((temp_workspace / "logs").absolute()),
|
|
'scripts_dir': str((temp_workspace / "scripts" / "generated").absolute())
|
|
},
|
|
'tools': {
|
|
'text_editors': {
|
|
'vscode': {
|
|
'path': 'C:\\apps\\Microsoft VS Code\\Code.exe',
|
|
'diff_args': ['--new-window', '--diff']
|
|
}
|
|
},
|
|
'default_editor': {'default_editor': 'vscode'}
|
|
}
|
|
}
|
|
import tomli_w
|
|
with open(temp_workspace / 'config.toml', 'wb') as f:
|
|
tomli_w.dump(config_content, f)
|
|
|
|
# Resolve absolute paths for shared resources
|
|
project_root = Path(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
config_file = temp_workspace / "config.toml"
|
|
|
|
cred_file = project_root / "credentials.toml"
|
|
mcp_file = project_root / "mcp_env.toml"
|
|
|
|
# Preserve GUI layout for tests
|
|
layout_file = Path("manualslop_layout.ini")
|
|
if layout_file.exists():
|
|
shutil.copy2(layout_file, temp_workspace / layout_file.name)
|
|
|
|
# Link assets for fonts
|
|
src_assets = project_root / "assets"
|
|
if src_assets.exists():
|
|
if os.name == 'nt':
|
|
subprocess.run(["cmd", "/c", "mklink", "/D", str(temp_workspace / "assets"), str(src_assets)], check=False)
|
|
else:
|
|
os.symlink(src_assets, temp_workspace / "assets")
|
|
|
|
# Check if already running (shouldn't be). If stale, kill the old process
|
|
# before spawning a new one — otherwise the new subprocess fails to bind
|
|
# port 8999 and the wait loop connects to the stale process instead,
|
|
# leading to state pollution across batches.
|
|
try:
|
|
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
|
|
if resp.status_code == 200:
|
|
print("[Fixture] WARNING: Hook Server already up on port 8999. Killing stale process...")
|
|
netstat = subprocess.run(["netstat", "-ano"], capture_output=True, text=True, timeout=5)
|
|
stale_pids: set[int] = set()
|
|
for line in netstat.stdout.splitlines():
|
|
if ":8999" in line and "LISTENING" in line:
|
|
parts = line.split()
|
|
if parts:
|
|
try: stale_pids.add(int(parts[-1]))
|
|
except ValueError: pass
|
|
for pid in stale_pids:
|
|
try:
|
|
subprocess.run(["taskkill", "/F", "/PID", str(pid)], capture_output=True, timeout=5)
|
|
print(f"[Fixture] Killed stale PID {pid}")
|
|
except Exception: pass
|
|
time.sleep(1.0)
|
|
print("[Fixture] Proceeding with fresh sloppy.py spawn")
|
|
except Exception: pass
|
|
|
|
print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...")
|
|
os.makedirs("logs", exist_ok=True)
|
|
log_file_name = Path(gui_script).name.replace('.', '_')
|
|
log_file = open(f"logs/{log_file_name}_test.log", "w", encoding="utf-8")
|
|
|
|
# Use environment variable to point to temp config if App supports it,
|
|
# or just run from that CWD.
|
|
env = os.environ.copy()
|
|
env["PYTHONPATH"] = str(project_root.absolute())
|
|
if config_file.exists():
|
|
env["SLOP_CONFIG"] = str(config_file.absolute())
|
|
if cred_file.exists():
|
|
env["SLOP_CREDENTIALS"] = str(cred_file.absolute())
|
|
if mcp_file.exists():
|
|
env["SLOP_MCP_ENV"] = str(mcp_file.absolute())
|
|
env["SLOP_GLOBAL_PRESETS"] = str((temp_workspace / "presets.toml").absolute())
|
|
env["SLOP_GLOBAL_TOOL_PRESETS"] = str((temp_workspace / "tool_presets.toml").absolute())
|
|
|
|
process = subprocess.Popen(
|
|
["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"],
|
|
stdout=log_file,
|
|
stderr=log_file,
|
|
text=True,
|
|
cwd=str(temp_workspace.absolute()),
|
|
env=env,
|
|
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0
|
|
)
|
|
|
|
diag.log_state("GUI Process PID", "N/A", process.pid)
|
|
|
|
max_retries = 15
|
|
ready = False
|
|
print(f"[Fixture] Waiting up to {max_retries}s for Hook Server on port 8999...")
|
|
start_time = time.time()
|
|
while time.time() - start_time < max_retries:
|
|
try:
|
|
response = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
|
|
if response.status_code == 200:
|
|
ready = True
|
|
print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.")
|
|
break
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
|
if process.poll() is not None:
|
|
print(f"[Fixture] {gui_script} process died unexpectedly during startup.")
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
diag.log_state("Startup Success", "N/A", str(ready))
|
|
diag.log_state("Startup Time", "N/A", f"{round(time.time() - start_time, 2)}s")
|
|
|
|
if not ready:
|
|
diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.")
|
|
print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.")
|
|
kill_process_tree(process.pid)
|
|
pytest.fail(f"Failed to start {gui_script} with test hooks.")
|
|
|
|
diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.")
|
|
|
|
try:
|
|
yield process, gui_script
|
|
finally:
|
|
print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...")
|
|
# Reset the GUI state before shutting down
|
|
try:
|
|
from src.api_hook_client import ApiHookClient
|
|
client = ApiHookClient()
|
|
client.reset_session()
|
|
time.sleep(0.5)
|
|
except: pass
|
|
|
|
if process.poll() is None:
|
|
kill_process_tree(process.pid)
|
|
# On Windows, taskkill /F /T can leave the Popen object in a state where it still thinks
|
|
# the handle is valid until waited on.
|
|
try:
|
|
process.wait(timeout=2)
|
|
except:
|
|
pass
|
|
|
|
time.sleep(0.5)
|
|
log_file.close()
|
|
# Cleanup temp workspace with retry for Windows file locks
|
|
for _ in range(5):
|
|
try:
|
|
shutil.rmtree(temp_workspace)
|
|
break
|
|
except PermissionError:
|
|
time.sleep(0.5)
|
|
except:
|
|
break |