Private
Public Access
0
0

fix(test): per-worker workspace subdir + file-lock for xdist live_gui coordination

This commit is contained in:
2026-06-09 22:23:33 -04:00
parent 38cb0f99b4
commit 4bb19835db
+93 -34
View File
@@ -16,7 +16,7 @@ if project_root not in sys.path:
sys.path.insert(0, project_root)
_RUN_ID = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
_RUN_WORKSPACE = Path(f"tests/artifacts/live_gui_workspace_{_RUN_ID}_{os.getpid()}")
_RUN_WORKSPACE = Path(f"tests/artifacts/live_gui_workspace_{_RUN_ID}")
thirdparty_dir = os.path.join(os.path.dirname(__file__), "..", "thirdparty")
if thirdparty_dir not in sys.path:
sys.path.insert(0, thirdparty_dir)
@@ -132,6 +132,23 @@ def pytest_terminal_summary(terminalreporter: object, exitstatus: int, config: o
def pytest_unconfigure(config: object) -> None:
_pytest_finished_event.set()
def pytest_collection_modifyitems(config: object, items: list[object]) -> None:
"""
xdist worker gate: only gw0 (or the master, if not running under xdist)
actually runs tests that depend on the live_gui fixture. Other workers
deselect those tests to avoid port-8999 collisions and to honor the
"don't multi-thread live_gui" directive.
[C: tests/conftest.py:live_gui fixture]
"""
_worker = os.environ.get("PYTEST_XDIST_WORKER", "master")
if _worker in ("master", "gw0"):
return
_skip = pytest.mark.skip(reason=f"live_gui tests only run on master/gw0 (this worker is {_worker})")
for item in items:
if "live_gui" in getattr(item, "fixturenames", ()):
item.add_marker(_skip)
_REQUIRED_TEST_IMPORTS: list[tuple[str, str]] = [
("sentence_transformers", "sentence-transformers"),
]
@@ -465,8 +482,10 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]:
diag = VerificationLogger("live_gui_startup", "live_gui_diag")
diag.log_state("GUI Script", "N/A", "gui_2.py")
# 1. Create a isolated workspace for the live GUI (per-run timestamped folder under tests/artifacts/)
temp_workspace = _RUN_WORKSPACE
# 1. Create a per-worker subdirectory under the run workspace so xdist workers
# don't pollute the main directory or collide on port 8999.
_worker_id = os.environ.get("PYTEST_XDIST_WORKER", "master")
temp_workspace = _RUN_WORKSPACE / _worker_id
if temp_workspace.exists():
for _ in range(5):
try:
@@ -534,30 +553,63 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]:
else:
os.symlink(src_assets, temp_workspace / "assets")
# Check if already running (shouldn't be). If stale, kill the old process
# before spawning a new one — otherwise the new subprocess fails to bind
# port 8999 and the wait loop connects to the stale process instead,
# leading to state pollution across batches.
# File-based mutex to coordinate sloppy.py ownership across xdist workers.
# The first worker to acquire the lock spawns sloppy.py. Other workers see
# the lock and wait for the hook server to come up, then yield a client
# handle (process=None; teardown skips kill + workspace cleanup). This
# serializes I/O on port 8999 so we don't try to run live_gui in parallel.
_owner_lock = temp_workspace / ".live_gui_owner.lock"
_is_owner = False
try:
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
if resp.status_code == 200:
print("[Fixture] WARNING: Hook Server already up on port 8999. Killing stale process...")
netstat = subprocess.run(["netstat", "-ano"], capture_output=True, text=True, timeout=5)
stale_pids: set[int] = set()
for line in netstat.stdout.splitlines():
if ":8999" in line and "LISTENING" in line:
parts = line.split()
if parts:
try: stale_pids.add(int(parts[-1]))
except ValueError: pass
for pid in stale_pids:
try:
subprocess.run(["taskkill", "/F", "/PID", str(pid)], capture_output=True, timeout=5)
print(f"[Fixture] Killed stale PID {pid}")
except Exception: pass
time.sleep(1.0)
print("[Fixture] Proceeding with fresh sloppy.py spawn")
except Exception: pass
# O_EXCL: atomic create-or-fail. If file exists, another worker owns the subprocess.
# We close the fd immediately; the file's existence on disk IS the lock.
# Holding the fd open would prevent os.remove() in teardown on Windows.
_lock_fd = os.open(str(_owner_lock), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(_lock_fd)
_is_owner = True
print(f"[Fixture] {_worker_id} acquired live_gui owner lock")
# Stale lock from a crashed worker? If hook server is NOT up, remove the lock
# and yield a client handle. If hook server IS up, the lock is valid (another
# worker beat us to it between the O_EXCL check and now).
try:
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
if resp.status_code != 200:
print(f"[Fixture] Stale owner lock with no hook server. Demoting {_worker_id} to client.")
try: os.remove(str(_owner_lock))
except FileNotFoundError: pass
_is_owner = False
except Exception:
pass
except FileExistsError:
# Another worker owns it. We are a client. Wait for the hook server to come up.
_is_owner = False
print(f"[Fixture] {_worker_id} is a live_gui client (another worker owns the subprocess)")
_process: subprocess.Popen | None = None
_log_file = None
if not _is_owner:
# Client worker: wait for the owner's hook server, then yield a dummy handle.
_wait_start = time.time()
while time.time() - _wait_start < 30:
try:
resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
if resp.status_code == 200:
print(f"[Fixture] Client {_worker_id} connected to hook server after {round(time.time() - _wait_start, 2)}s.")
try:
yield _LiveGuiHandle(None, gui_script, temp_workspace)
finally:
pass # Client does not kill the subprocess or clean up the workspace
return
except Exception:
pass
time.sleep(0.5)
# If we get here, the owner worker failed to start the hook server.
print(f"[Fixture] Client {_worker_id} timed out waiting for hook server. Yielding null handle.")
try:
yield _LiveGuiHandle(None, gui_script, temp_workspace)
finally:
pass
return
print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...")
os.makedirs("logs", exist_ok=True)
@@ -577,7 +629,7 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]:
env["SLOP_GLOBAL_PRESETS"] = str((temp_workspace / "presets.toml").absolute())
env["SLOP_GLOBAL_TOOL_PRESETS"] = str((temp_workspace / "tool_presets.toml").absolute())
process = subprocess.Popen(
_process = subprocess.Popen(
["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"],
stdout=log_file,
stderr=log_file,
@@ -587,7 +639,7 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]:
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0
)
diag.log_state("GUI Process PID", "N/A", process.pid)
diag.log_state("GUI Process PID", "N/A", _process.pid)
max_retries = 15
ready = False
@@ -601,7 +653,7 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]:
print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.")
break
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
if process.poll() is not None:
if _process.poll() is not None:
print(f"[Fixture] {gui_script} process died unexpectedly during startup.")
break
time.sleep(0.5)
@@ -612,14 +664,18 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]:
if not ready:
diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.")
print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.")
kill_process_tree(process.pid)
kill_process_tree(_process.pid)
pytest.fail(f"Failed to start {gui_script} with test hooks.")
diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.")
try:
yield _LiveGuiHandle(process, gui_script, temp_workspace)
yield _LiveGuiHandle(_process, gui_script, temp_workspace)
finally:
if not _is_owner:
# Client worker: do not kill the subprocess or clean the workspace.
# The owner worker is responsible for both.
return
print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...")
# Reset the GUI state before shutting down
try:
@@ -629,17 +685,20 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]:
time.sleep(0.5)
except: pass
if process.poll() is None:
kill_process_tree(process.pid)
if _process.poll() is None:
kill_process_tree(_process.pid)
# On Windows, taskkill /F /T can leave the Popen object in a state where it still thinks
# the handle is valid until waited on.
try:
process.wait(timeout=2)
_process.wait(timeout=2)
except:
pass
time.sleep(0.5)
log_file.close()
# Release the owner lock so the next pytest invocation can become owner.
try: os.remove(str(_owner_lock))
except FileNotFoundError: pass
# Cleanup temp workspace with retry for Windows file locks
for _ in range(5):
try: