7bcb5a8c07
Eliminates 22 call sites that bypassed the AppController state owner
and read/wrote config.toml directly. AppController is now the single
source of truth for self.config; gui_2.py, commands.py, etc. go
through controller.save_config() / controller.load_config().
Production changes:
- src/models.py: rename load_config -> _load_config_from_disk,
save_config -> _save_config_to_disk (private I/O primitives)
- src/app_controller.py: add public load_config()/save_config() methods
that own the state. Update 3 internal call sites and 3 ConductorEngine
call sites to pass max_workers from self.config
- src/multi_agent_conductor.py: ConductorEngine.__init__ now takes
max_workers as a parameter (caller responsibility, not I/O primitive)
- src/external_editor.py: get_default_launcher() takes config as a
parameter; gui_2.py:1311,4776 pass app.config
- src/gui_2.py: 17 sites of models.save_config(X.config) replaced with
X.save_config() (delegates via __getattr__ to controller)
- src/commands.py: save_all() uses app.save_config()
Test changes (route through controller, not I/O primitive):
- tests/conftest.py: mock_app and app_instance fixtures now patch
AppController.load_config/save_config instead of models I/O primitives
- 18 other test files: patches renamed from models._save_config_to_disk
to AppController.save_config (and same for load_config)
- tests/test_app_controller_mcp.py: use SLOP_CONFIG env var instead of
patching removed CONFIG_PATH module constant
- tests/test_parallel_execution.py: pass max_workers=2 explicitly to
ConductorEngine (caller no longer reads config)
- tests/test_gui_paths.py: add save_config=MagicMock() to MockApp;
assert on controller method, not I/O primitive
- tests/test_models_no_top_level_tomli_w.py: still calls private
_save_config_to_disk directly (the only allowed exception; tests
the lazy-load behavior of the primitive itself)
New files:
- scripts/audit_no_models_config_io.py: enforces the rule (--strict,
--json modes; AST-based docstring detection to avoid false positives)
- conductor/code_styleguides/config_state_owner.md: documents the rule
Verification:
- 67 targeted tests pass
- scripts/audit_no_models_config_io.py --strict returns 0
This is the architectural cleanup that surfaced during the
audit_architectural_cheats_20260607 review. Closes the smoke-gun
CONFIG_PATH module constant (already done in 0c7ebf22) AND the
free-function models.load_config/save_config smell.
[conductor(checkpoint): config-iO-refactor-20260607]
537 lines
21 KiB
Python
537 lines
21 KiB
Python
import pytest
|
|
import subprocess
|
|
import time
|
|
import requests
|
|
import os
|
|
import signal
|
|
import sys
|
|
import datetime
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Generator, Any
|
|
from unittest.mock import patch
|
|
|
|
thirdparty_dir = os.path.join(os.path.dirname(__file__), "..", "thirdparty")
|
|
if thirdparty_dir not in sys.path:
|
|
sys.path.insert(0, thirdparty_dir)
|
|
|
|
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if project_root not in sys.path:
|
|
sys.path.insert(0, project_root)
|
|
|
|
from defer.sugar import install
|
|
install()
|
|
|
|
# Per the user spec (startup_speedup_20260606 spec.md:2.2 Layer 3,
|
|
# and the message in workflow.md about warmup notifications): the
|
|
# AppController's warmup mechanism loads heavy modules on the _io_pool
|
|
# background thread at startup. Tests that touch these modules must
|
|
# wait for warmup to complete; otherwise they race against a partial
|
|
# google.genai import and hit "partially initialized" errors.
|
|
#
|
|
# Wait for the warmup before any test runs. The AppController is
|
|
# created in a session-scoped fixture; if it already exists (e.g.,
|
|
# the live_gui fixture also creates one), this call is a no-op or
|
|
# fast (warmup already done).
|
|
#
|
|
# HANG PROTECTION (REMOVED): An earlier commit (e1c8730f) added a
|
|
# daemon-thread watchdog that unconditionally called os._exit(0) after
|
|
# 30s. The intent was to bound hangs from ThreadPoolExecutor.__del__
|
|
# and the live_gui fixture teardown. Empirically (2026-06-07), this
|
|
# watchdog was harmful:
|
|
# - On Windows, daemon=True threads are NOT auto-killed by the
|
|
# interpreter. The watchdog's time.sleep(30) continues through
|
|
# pytest's normal shutdown, then os._exit(0) fires.
|
|
# - For batches that take >30s (e.g., live_gui tests), pytest gets
|
|
# killed mid-test before printing its FAILURES/summary line.
|
|
# - The os._exit(0) hides pytest's actual exit code, so the
|
|
# run_tests_batched.py runner reports 'Batch N passed' even when
|
|
# tests had failed (e.g., 5 F's in test_ticket_queue).
|
|
#
|
|
# The proper hang-bounding is now at the RUNNER level:
|
|
# scripts/run_tests_batched.py uses subprocess.run(timeout=1000) per
|
|
# batch. If pytest hangs, the runner kills it after 1000s and reports
|
|
# failure. Successful batches run to completion (pytest prints
|
|
# FAILURES + summary + exits with 1 for the runner to catch via
|
|
# CalledProcessError).
|
|
import atexit
|
|
from src.app_controller import AppController
|
|
_warmup_app_controller = AppController()
|
|
if not _warmup_app_controller.wait_for_warmup(timeout=60.0):
|
|
import warnings
|
|
warnings.warn(
|
|
"AppController warmup did not complete within 60s. "
|
|
"Tests that depend on warmup modules (google.genai, anthropic, "
|
|
"openai, etc.) may fail.",
|
|
RuntimeWarning,
|
|
stacklevel=2,
|
|
)
|
|
|
|
# HANG PROTECTION (signal-based watchdog). Two observed hang chains
|
|
# from e1c8730f and the prior naive watchdog:
|
|
# 1. ThreadPoolExecutor.__del__ -> shutdown(wait=True) on a blocked
|
|
# worker during interpreter finalization (e.g., the io_pool
|
|
# created in AppController.__init__ at conftest line 65).
|
|
# 2. The session-scoped `live_gui` fixture teardown hanging in
|
|
# client.reset_session() (HTTP call to the hook server) or
|
|
# kill_process_tree(process.pid) / process.wait(timeout=2) waiting
|
|
# for the sloppy.py subprocess to die on Windows.
|
|
# The naive 30s os._exit(0) approach CUT OFF BATCHES MID-TEST (every
|
|
# batch exited at 32.0s exactly). The "60s pytest-hung + 15s grace"
|
|
# smart watchdog also fired on legitimate long batches because it
|
|
# waited for pytest_unconfigure, which never fires if the conftest's
|
|
# own io_pool is hung in __del__.
|
|
#
|
|
# CORRECT approach: signal-based. Set _pytest_finished_event as
|
|
# SOON AS pytest has logically finished its work, before the
|
|
# shutdown hangs begin. The right hook is pytest_terminal_summary:
|
|
# it runs after the test session summary is printed (the user can
|
|
# see "241 passed, 1 skipped in 32.30s" in the output) but BEFORE
|
|
# finalization. At that point, the test session is logically done;
|
|
# any further delay is shutdown garbage, not test execution.
|
|
#
|
|
# Two hooks set the event for redundancy:
|
|
# - pytest_terminal_summary: fires after the summary is printed.
|
|
# This is the primary signal.
|
|
# - pytest_unconfigure: fires at the very end, after the summary.
|
|
# Fallback in case the terminal summary hook isn't reached (e.g.,
|
|
# pytest crashes mid-summary).
|
|
# After the event fires, give 5s for normal finalization, then
|
|
# os._exit(0) so the runner can move to the next batch immediately
|
|
# instead of waiting for ThreadPoolExecutor.__del__ to unblock
|
|
# (which can take 60+ seconds).
|
|
#
|
|
# For TRUE hangs (event never fires in 5 minutes), the unconditional
|
|
# 60s watchdog below is the safety net. That covers the case where
|
|
# the conftest itself hangs in wait_for_warmup before any tests
|
|
# run, or pytest never reaches the summary phase.
|
|
import threading
|
|
_pytest_finished_event: threading.Event = threading.Event()
|
|
|
|
def pytest_terminal_summary(terminalreporter: object, exitstatus: int, config: object) -> None:
|
|
_pytest_finished_event.set()
|
|
|
|
def pytest_unconfigure(config: object) -> None:
|
|
_pytest_finished_event.set()
|
|
|
|
def _smart_watchdog_exit() -> None:
|
|
if not _pytest_finished_event.wait(timeout=300.0):
|
|
os._exit(2)
|
|
import time
|
|
time.sleep(5.0)
|
|
os._exit(0)
|
|
|
|
threading.Thread(target=_smart_watchdog_exit, daemon=True, name="conftest-smart-watchdog").start()
|
|
|
|
def _unconditional_watchdog_exit() -> None:
|
|
"""Hard fail-safe: also signal-based, but with a much longer
|
|
timeout than the smart watchdog.
|
|
|
|
The smart watchdog (above) uses 300s. This sledgehammer waits
|
|
900s (15 minutes) for the same signal, so a long-running test
|
|
can take up to 15 minutes before we declare it a hang. The
|
|
only case this catches that the smart doesn't: pytest finishes
|
|
but the test session is so long the smart's 300s expires first.
|
|
In that case we still want the runner to move on.
|
|
|
|
If the signal never fires (true hang), os._exit(2) so the runner
|
|
catches it as CalledProcessError.
|
|
"""
|
|
if not _pytest_finished_event.wait(timeout=900.0):
|
|
os._exit(2)
|
|
import time
|
|
time.sleep(5.0)
|
|
os._exit(0)
|
|
|
|
threading.Thread(target=_unconditional_watchdog_exit, daemon=True, name="conftest-unconditional-watchdog").start()
|
|
|
|
from src.gui_2 import App
|
|
|
|
class VerificationLogger:
|
|
def __init__(self, test_name: str, script_name: str) -> None:
|
|
self.test_name = test_name
|
|
self.script_name = script_name
|
|
self.entries = []
|
|
self.start_time = time.time()
|
|
# Route artifacts to tests/logs/
|
|
self.logs_dir = Path(f"tests/logs/{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}")
|
|
self.logs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def log_state(self, field: str, before: Any, after: Any) -> None:
|
|
"""
|
|
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_vlogger_availability.py:test_vlogger_available]
|
|
"""
|
|
delta = ""
|
|
if isinstance(before, (int, float)) and isinstance(after, (int, float)):
|
|
diff = after - before
|
|
delta = f"{'+' if diff > 0 else ''}{diff}"
|
|
self.entries.append({
|
|
"Field": field,
|
|
"Before": str(before),
|
|
"After": str(after),
|
|
"Delta": delta
|
|
})
|
|
|
|
def finalize(self, title: str, status: str, result_msg: str) -> None:
|
|
"""
|
|
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_end_to_end_tier4_integration, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only, tests/test_vlogger_availability.py:test_vlogger_available]
|
|
"""
|
|
round(time.time() - self.start_time, 2)
|
|
log_file = self.logs_dir / f"{self.script_name}.txt"
|
|
with open(log_file, "w", encoding="utf-8") as f:
|
|
f.write(f"[ Test: {self.test_name} ]\n")
|
|
f.write(f"({title})\n\n")
|
|
f.write(f"{self.test_name}: before vs after\n")
|
|
f.write(f"{'Field':<25} {'Before':<20} {'After':<20} {'Delta':<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
for e in self.entries:
|
|
f.write(f"{e['Field']:<25} {e['Before']:<20} {e['After']:<20} {e['Delta']:<15}\n")
|
|
f.write("-" * 80 + "\n")
|
|
f.write(f"{status} {self.test_name} ({result_msg})\n\n")
|
|
print(f"[FINAL] {self.test_name}: {status} - {result_msg}")
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def isolate_workspace(tmp_path_factory, monkeypatch) -> Generator[None, None, None]:
|
|
"""
|
|
Autouse fixture to isolate tests from the active user workspace.
|
|
Protects the real config.toml and manual_slop.toml from being overwritten.
|
|
"""
|
|
test_workspace = tmp_path_factory.mktemp("isolated_workspace")
|
|
|
|
config_path = test_workspace / "config.toml"
|
|
import tomli_w
|
|
with open(config_path, "wb") as f:
|
|
tomli_w.dump({
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}, f)
|
|
|
|
monkeypatch.setenv("SLOP_CONFIG", str(config_path))
|
|
monkeypatch.setenv("SLOP_GLOBAL_PRESETS", str(test_workspace / "presets.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_TOOL_PRESETS", str(test_workspace / "tool_presets.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_PERSONAS", str(test_workspace / "personas.toml"))
|
|
monkeypatch.setenv("SLOP_GLOBAL_WORKSPACE_PROFILES", str(test_workspace / "workspace_profiles.toml"))
|
|
|
|
yield
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_paths() -> Generator[None, None, None]:
|
|
"""
|
|
|
|
|
|
Autouse fixture that resets the paths global state before each test.
|
|
"""
|
|
from src import paths
|
|
paths.reset_resolved()
|
|
yield
|
|
paths.reset_resolved()
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_ai_client() -> Generator[None, None, None]:
|
|
"""
|
|
|
|
|
|
Autouse fixture that resets the ai_client global state before each test.
|
|
This is critical for preventing state pollution between tests.
|
|
"""
|
|
from src import ai_client
|
|
from src import mcp_client
|
|
ai_client.reset_session()
|
|
# Reset callbacks to None or default to ensure no carry-over
|
|
ai_client.confirm_and_run_callback = None
|
|
ai_client.comms_log_callback = None
|
|
ai_client.tool_log_callback = None
|
|
# Clear all event listeners
|
|
ai_client.events.clear()
|
|
# Reset provider/model to defaults
|
|
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
|
# Reset MCP client state
|
|
mcp_client.configure([], [])
|
|
yield
|
|
ai_client.reset_session()
|
|
|
|
@pytest.fixture
|
|
def vlogger(request) -> VerificationLogger:
|
|
"""Fixture to provide a VerificationLogger instance to a test."""
|
|
test_name = request.node.name
|
|
script_name = Path(request.node.fspath).stem
|
|
return VerificationLogger(test_name, script_name)
|
|
|
|
def kill_process_tree(pid: int | None) -> None:
|
|
"""Robustly kills a process and all its children."""
|
|
if pid is None:
|
|
return
|
|
try:
|
|
print(f"[Fixture] Attempting to kill process tree for PID {pid}...")
|
|
if os.name == 'nt':
|
|
# /F is force, /T is tree (includes children)
|
|
subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
check=False)
|
|
else:
|
|
# On Unix, kill the process group
|
|
os.killpg(os.getpgid(pid), signal.SIGKILL)
|
|
print(f"[Fixture] Process tree {pid} killed.")
|
|
except Exception as e:
|
|
print(f"[Fixture] Error killing process tree {pid}: {e}")
|
|
|
|
@pytest.fixture
|
|
def mock_app() -> Generator[App, None, None]:
|
|
"""
|
|
|
|
|
|
Mock version of the App for simple unit tests that don't need a loop.
|
|
"""
|
|
with (
|
|
patch('src.app_controller.AppController.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('src.app_controller.AppController.save_config'),
|
|
patch('src.gui_2.project_manager'),
|
|
patch('src.gui_2.session_logger'),
|
|
patch('src.gui_2.immapp.run'),
|
|
patch('src.app_controller.AppController._load_active_project'),
|
|
patch('src.app_controller.AppController._fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch('src.app_controller.AppController._prune_old_logs'),
|
|
patch('src.app_controller.AppController.start_services'),
|
|
patch('src.app_controller.AppController._init_ai_and_hooks'),
|
|
patch('src.performance_monitor.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
if hasattr(app, 'controller'):
|
|
app.controller.shutdown()
|
|
elif hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
@pytest.fixture
|
|
def app_instance() -> Generator[App, None, None]:
|
|
"""
|
|
|
|
|
|
Centralized App instance with all external side effects mocked.
|
|
Matches the pattern used in test_token_viz.py and test_gui_phase4.py.
|
|
[C: tests/test_gui2_events.py:test_app_subscribes_to_events]
|
|
"""
|
|
with (
|
|
patch('src.app_controller.AppController.load_config', return_value={
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {'paths': [], 'active': ''},
|
|
'gui': {'show_windows': {}}
|
|
}),
|
|
patch('src.app_controller.AppController.save_config'),
|
|
patch('src.gui_2.project_manager'),
|
|
patch('src.gui_2.session_logger'),
|
|
patch('src.gui_2.immapp.run'),
|
|
patch('src.app_controller.AppController._load_active_project'),
|
|
patch('src.app_controller.AppController._fetch_models'),
|
|
patch.object(App, '_load_fonts'),
|
|
patch.object(App, '_post_init'),
|
|
patch('src.app_controller.AppController._prune_old_logs'),
|
|
patch('src.app_controller.AppController.start_services'),
|
|
patch('src.app_controller.AppController._init_ai_and_hooks'),
|
|
patch('src.performance_monitor.PerformanceMonitor')
|
|
):
|
|
app = App()
|
|
yield app
|
|
# Cleanup: Ensure background threads are stopped
|
|
if hasattr(app, 'controller'):
|
|
app.controller.shutdown()
|
|
|
|
if hasattr(app, 'shutdown'):
|
|
app.shutdown()
|
|
|
|
@pytest.fixture(scope="session")
|
|
def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
|
|
"""
|
|
|
|
|
|
Session-scoped fixture that starts sloppy.py with --enable-test-hooks.
|
|
Includes high-signal environment telemetry and workspace isolation.
|
|
"""
|
|
gui_script = os.path.abspath("sloppy.py")
|
|
diag = VerificationLogger("live_gui_startup", "live_gui_diag")
|
|
diag.log_state("GUI Script", "N/A", "gui_2.py")
|
|
|
|
# 1. Create a isolated workspace for the live GUI
|
|
temp_workspace = Path("tests/artifacts/live_gui_workspace")
|
|
if temp_workspace.exists():
|
|
for _ in range(5):
|
|
try:
|
|
shutil.rmtree(temp_workspace)
|
|
break
|
|
except PermissionError:
|
|
time.sleep(0.5)
|
|
# Create the workspace directory before writing files
|
|
temp_workspace.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create minimal project files to avoid cluttering root
|
|
(temp_workspace / "manual_slop.toml").write_text("[project]\nname = 'TestProject'\n\n[conductor]\ndir = 'conductor'\n", encoding="utf-8")
|
|
(temp_workspace / "conductor" / "tracks").mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create a local config.toml in temp_workspace
|
|
config_content = {
|
|
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
|
|
'projects': {
|
|
'paths': [str((temp_workspace / 'manual_slop.toml').absolute())],
|
|
'active': str((temp_workspace / 'manual_slop.toml').absolute())
|
|
},
|
|
'paths': {
|
|
'logs_dir': str((temp_workspace / "logs").absolute()),
|
|
'scripts_dir': str((temp_workspace / "scripts" / "generated").absolute())
|
|
},
|
|
'tools': {
|
|
'text_editors': {
|
|
'vscode': {
|
|
'path': 'C:\\apps\\Microsoft VS Code\\Code.exe',
|
|
'diff_args': ['--new-window', '--diff']
|
|
}
|
|
},
|
|
'default_editor': {'default_editor': 'vscode'}
|
|
}
|
|
}
|
|
import tomli_w
|
|
with open(temp_workspace / 'config.toml', 'wb') as f:
|
|
tomli_w.dump(config_content, f)
|
|
|
|
# Resolve absolute paths for shared resources
|
|
project_root = Path(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
config_file = temp_workspace / "config.toml"
|
|
|
|
cred_file = project_root / "credentials.toml"
|
|
mcp_file = project_root / "mcp_env.toml"
|
|
|
|
# Preserve GUI layout for tests
|
|
layout_file = Path("manualslop_layout.ini")
|
|
if layout_file.exists():
|
|
shutil.copy2(layout_file, temp_workspace / layout_file.name)
|
|
|
|
# Link assets for fonts
|
|
src_assets = project_root / "assets"
|
|
if src_assets.exists():
|
|
if os.name == 'nt':
|
|
subprocess.run(["cmd", "/c", "mklink", "/D", str(temp_workspace / "assets"), str(src_assets)], check=False)
|
|
else:
|
|
os.symlink(src_assets, temp_workspace / "assets")
|
|
|
|
# Check if already running (shouldn't be). If stale, kill the old process
|
|
# before spawning a new one — otherwise the new subprocess fails to bind
|
|
# port 8999 and the wait loop connects to the stale process instead,
|
|
# leading to state pollution across batches.
|
|
try:
|
|
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
|
|
if resp.status_code == 200:
|
|
print("[Fixture] WARNING: Hook Server already up on port 8999. Killing stale process...")
|
|
netstat = subprocess.run(["netstat", "-ano"], capture_output=True, text=True, timeout=5)
|
|
stale_pids: set[int] = set()
|
|
for line in netstat.stdout.splitlines():
|
|
if ":8999" in line and "LISTENING" in line:
|
|
parts = line.split()
|
|
if parts:
|
|
try: stale_pids.add(int(parts[-1]))
|
|
except ValueError: pass
|
|
for pid in stale_pids:
|
|
try:
|
|
subprocess.run(["taskkill", "/F", "/PID", str(pid)], capture_output=True, timeout=5)
|
|
print(f"[Fixture] Killed stale PID {pid}")
|
|
except Exception: pass
|
|
time.sleep(1.0)
|
|
print("[Fixture] Proceeding with fresh sloppy.py spawn")
|
|
except Exception: pass
|
|
|
|
print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...")
|
|
os.makedirs("logs", exist_ok=True)
|
|
log_file_name = Path(gui_script).name.replace('.', '_')
|
|
log_file = open(f"logs/{log_file_name}_test.log", "w", encoding="utf-8")
|
|
|
|
# Use environment variable to point to temp config if App supports it,
|
|
# or just run from that CWD.
|
|
env = os.environ.copy()
|
|
env["PYTHONPATH"] = str(project_root.absolute())
|
|
if config_file.exists():
|
|
env["SLOP_CONFIG"] = str(config_file.absolute())
|
|
if cred_file.exists():
|
|
env["SLOP_CREDENTIALS"] = str(cred_file.absolute())
|
|
if mcp_file.exists():
|
|
env["SLOP_MCP_ENV"] = str(mcp_file.absolute())
|
|
env["SLOP_GLOBAL_PRESETS"] = str((temp_workspace / "presets.toml").absolute())
|
|
env["SLOP_GLOBAL_TOOL_PRESETS"] = str((temp_workspace / "tool_presets.toml").absolute())
|
|
|
|
process = subprocess.Popen(
|
|
["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"],
|
|
stdout=log_file,
|
|
stderr=log_file,
|
|
text=True,
|
|
cwd=str(temp_workspace.absolute()),
|
|
env=env,
|
|
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0
|
|
)
|
|
|
|
diag.log_state("GUI Process PID", "N/A", process.pid)
|
|
|
|
max_retries = 15
|
|
ready = False
|
|
print(f"[Fixture] Waiting up to {max_retries}s for Hook Server on port 8999...")
|
|
start_time = time.time()
|
|
while time.time() - start_time < max_retries:
|
|
try:
|
|
response = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
|
|
if response.status_code == 200:
|
|
ready = True
|
|
print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.")
|
|
break
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
|
if process.poll() is not None:
|
|
print(f"[Fixture] {gui_script} process died unexpectedly during startup.")
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
diag.log_state("Startup Success", "N/A", str(ready))
|
|
diag.log_state("Startup Time", "N/A", f"{round(time.time() - start_time, 2)}s")
|
|
|
|
if not ready:
|
|
diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.")
|
|
print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.")
|
|
kill_process_tree(process.pid)
|
|
pytest.fail(f"Failed to start {gui_script} with test hooks.")
|
|
|
|
diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.")
|
|
|
|
try:
|
|
yield process, gui_script
|
|
finally:
|
|
print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...")
|
|
# Reset the GUI state before shutting down
|
|
try:
|
|
from src.api_hook_client import ApiHookClient
|
|
client = ApiHookClient()
|
|
client.reset_session()
|
|
time.sleep(0.5)
|
|
except: pass
|
|
|
|
if process.poll() is None:
|
|
kill_process_tree(process.pid)
|
|
# On Windows, taskkill /F /T can leave the Popen object in a state where it still thinks
|
|
# the handle is valid until waited on.
|
|
try:
|
|
process.wait(timeout=2)
|
|
except:
|
|
pass
|
|
|
|
time.sleep(0.5)
|
|
log_file.close()
|
|
# Cleanup temp workspace with retry for Windows file locks
|
|
for _ in range(5):
|
|
try:
|
|
shutil.rmtree(temp_workspace)
|
|
break
|
|
except PermissionError:
|
|
time.sleep(0.5)
|
|
except:
|
|
break |