From 4bb19835dbd847f5116040407f53355b53df5678 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Tue, 9 Jun 2026 22:23:33 -0400 Subject: [PATCH] fix(test): per-worker workspace subdir + file-lock for xdist live_gui coordination --- tests/conftest.py | 127 +++++++++++++++++++++++++++++++++------------- 1 file changed, 93 insertions(+), 34 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 22d7cd02..80dcd0a0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,7 +16,7 @@ if project_root not in sys.path: sys.path.insert(0, project_root) _RUN_ID = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") -_RUN_WORKSPACE = Path(f"tests/artifacts/live_gui_workspace_{_RUN_ID}_{os.getpid()}") +_RUN_WORKSPACE = Path(f"tests/artifacts/live_gui_workspace_{_RUN_ID}") thirdparty_dir = os.path.join(os.path.dirname(__file__), "..", "thirdparty") if thirdparty_dir not in sys.path: sys.path.insert(0, thirdparty_dir) @@ -132,6 +132,23 @@ def pytest_terminal_summary(terminalreporter: object, exitstatus: int, config: o def pytest_unconfigure(config: object) -> None: _pytest_finished_event.set() + +def pytest_collection_modifyitems(config: object, items: list[object]) -> None: + """ + xdist worker gate: only gw0 (or the master, if not running under xdist) + actually runs tests that depend on the live_gui fixture. Other workers + deselect those tests to avoid port-8999 collisions and to honor the + "don't multi-thread live_gui" directive. + [C: tests/conftest.py:live_gui fixture] + """ + _worker = os.environ.get("PYTEST_XDIST_WORKER", "master") + if _worker in ("master", "gw0"): + return + _skip = pytest.mark.skip(reason=f"live_gui tests only run on master/gw0 (this worker is {_worker})") + for item in items: + if "live_gui" in getattr(item, "fixturenames", ()): + item.add_marker(_skip) + _REQUIRED_TEST_IMPORTS: list[tuple[str, str]] = [ ("sentence_transformers", "sentence-transformers"), ] @@ -465,8 +482,10 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]: diag = VerificationLogger("live_gui_startup", "live_gui_diag") diag.log_state("GUI Script", "N/A", "gui_2.py") - # 1. Create a isolated workspace for the live GUI (per-run timestamped folder under tests/artifacts/) - temp_workspace = _RUN_WORKSPACE + # 1. Create a per-worker subdirectory under the run workspace so xdist workers + # don't pollute the main directory or collide on port 8999. + _worker_id = os.environ.get("PYTEST_XDIST_WORKER", "master") + temp_workspace = _RUN_WORKSPACE / _worker_id if temp_workspace.exists(): for _ in range(5): try: @@ -534,30 +553,63 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]: else: os.symlink(src_assets, temp_workspace / "assets") - # Check if already running (shouldn't be). If stale, kill the old process - # before spawning a new one — otherwise the new subprocess fails to bind - # port 8999 and the wait loop connects to the stale process instead, - # leading to state pollution across batches. + # File-based mutex to coordinate sloppy.py ownership across xdist workers. + # The first worker to acquire the lock spawns sloppy.py. Other workers see + # the lock and wait for the hook server to come up, then yield a client + # handle (process=None; teardown skips kill + workspace cleanup). This + # serializes I/O on port 8999 so we don't try to run live_gui in parallel. + _owner_lock = temp_workspace / ".live_gui_owner.lock" + _is_owner = False try: - resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5) - if resp.status_code == 200: - print("[Fixture] WARNING: Hook Server already up on port 8999. Killing stale process...") - netstat = subprocess.run(["netstat", "-ano"], capture_output=True, text=True, timeout=5) - stale_pids: set[int] = set() - for line in netstat.stdout.splitlines(): - if ":8999" in line and "LISTENING" in line: - parts = line.split() - if parts: - try: stale_pids.add(int(parts[-1])) - except ValueError: pass - for pid in stale_pids: - try: - subprocess.run(["taskkill", "/F", "/PID", str(pid)], capture_output=True, timeout=5) - print(f"[Fixture] Killed stale PID {pid}") - except Exception: pass - time.sleep(1.0) - print("[Fixture] Proceeding with fresh sloppy.py spawn") - except Exception: pass + # O_EXCL: atomic create-or-fail. If file exists, another worker owns the subprocess. + # We close the fd immediately; the file's existence on disk IS the lock. + # Holding the fd open would prevent os.remove() in teardown on Windows. + _lock_fd = os.open(str(_owner_lock), os.O_CREAT | os.O_EXCL | os.O_WRONLY) + os.close(_lock_fd) + _is_owner = True + print(f"[Fixture] {_worker_id} acquired live_gui owner lock") + # Stale lock from a crashed worker? If hook server is NOT up, remove the lock + # and yield a client handle. If hook server IS up, the lock is valid (another + # worker beat us to it between the O_EXCL check and now). + try: + resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5) + if resp.status_code != 200: + print(f"[Fixture] Stale owner lock with no hook server. Demoting {_worker_id} to client.") + try: os.remove(str(_owner_lock)) + except FileNotFoundError: pass + _is_owner = False + except Exception: + pass + except FileExistsError: + # Another worker owns it. We are a client. Wait for the hook server to come up. + _is_owner = False + print(f"[Fixture] {_worker_id} is a live_gui client (another worker owns the subprocess)") + + _process: subprocess.Popen | None = None + _log_file = None + if not _is_owner: + # Client worker: wait for the owner's hook server, then yield a dummy handle. + _wait_start = time.time() + while time.time() - _wait_start < 30: + try: + resp = requests.get("http://127.0.0.1:8999/status", timeout=0.5) + if resp.status_code == 200: + print(f"[Fixture] Client {_worker_id} connected to hook server after {round(time.time() - _wait_start, 2)}s.") + try: + yield _LiveGuiHandle(None, gui_script, temp_workspace) + finally: + pass # Client does not kill the subprocess or clean up the workspace + return + except Exception: + pass + time.sleep(0.5) + # If we get here, the owner worker failed to start the hook server. + print(f"[Fixture] Client {_worker_id} timed out waiting for hook server. Yielding null handle.") + try: + yield _LiveGuiHandle(None, gui_script, temp_workspace) + finally: + pass + return print(f"\n[Fixture] Starting {gui_script} --enable-test-hooks in {temp_workspace}...") os.makedirs("logs", exist_ok=True) @@ -577,7 +629,7 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]: env["SLOP_GLOBAL_PRESETS"] = str((temp_workspace / "presets.toml").absolute()) env["SLOP_GLOBAL_TOOL_PRESETS"] = str((temp_workspace / "tool_presets.toml").absolute()) - process = subprocess.Popen( + _process = subprocess.Popen( ["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"], stdout=log_file, stderr=log_file, @@ -587,7 +639,7 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]: creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0 ) - diag.log_state("GUI Process PID", "N/A", process.pid) + diag.log_state("GUI Process PID", "N/A", _process.pid) max_retries = 15 ready = False @@ -601,7 +653,7 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]: print(f"[Fixture] GUI Hook Server for {gui_script} is ready after {round(time.time() - start_time, 2)}s.") break except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): - if process.poll() is not None: + if _process.poll() is not None: print(f"[Fixture] {gui_script} process died unexpectedly during startup.") break time.sleep(0.5) @@ -612,14 +664,18 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]: if not ready: diag.finalize("Live GUI Startup Telemetry", "FAIL", "Hook server failed to respond.") print(f"[Fixture] TIMEOUT/FAILURE: Hook server for {gui_script} failed to respond.") - kill_process_tree(process.pid) + kill_process_tree(_process.pid) pytest.fail(f"Failed to start {gui_script} with test hooks.") diag.finalize("Live GUI Startup Telemetry", "PASS", "Hook server successfully initialized.") try: - yield _LiveGuiHandle(process, gui_script, temp_workspace) + yield _LiveGuiHandle(_process, gui_script, temp_workspace) finally: + if not _is_owner: + # Client worker: do not kill the subprocess or clean the workspace. + # The owner worker is responsible for both. + return print(f"\n[Fixture] Finally block triggered: Shutting down {gui_script}...") # Reset the GUI state before shutting down try: @@ -629,17 +685,20 @@ def live_gui(request) -> Generator["_LiveGuiHandle", None, None]: time.sleep(0.5) except: pass - if process.poll() is None: - kill_process_tree(process.pid) + if _process.poll() is None: + kill_process_tree(_process.pid) # On Windows, taskkill /F /T can leave the Popen object in a state where it still thinks # the handle is valid until waited on. try: - process.wait(timeout=2) + _process.wait(timeout=2) except: pass time.sleep(0.5) log_file.close() + # Release the owner lock so the next pytest invocation can become owner. + try: os.remove(str(_owner_lock)) + except FileNotFoundError: pass # Cleanup temp workspace with retry for Windows file locks for _ in range(5): try: