diff --git a/project_history.toml b/project_history.toml index fc5a8bd0..4c5f3741 100644 --- a/project_history.toml +++ b/project_history.toml @@ -9,5 +9,5 @@ active = "main" [discussions.main] git_commit = "" -last_updated = "2026-06-08T01:05:27" +last_updated = "2026-06-08T15:44:57" history = [] diff --git a/src/api_hook_client.py b/src/api_hook_client.py index 55a77e0a..194cd4f0 100644 --- a/src/api_hook_client.py +++ b/src/api_hook_client.py @@ -404,6 +404,35 @@ class ApiHookClient: last_status["timeout"] = True return last_status + def get_io_pool_status(self) -> dict[str, Any]: + """ + Returns the controller's io_pool status: {idle, inflight}. + - idle: True if no jobs are currently in-flight (running or queued) + - inflight: integer count of jobs submitted via submit_io + Used by tests to wait for the pool to drain before submitting work + when sharing a live_gui session with prior tests. + [C: tests/test_api_hook_client.py:test_get_io_pool_status_*] + """ + result = self._make_request('GET', '/api/io_pool_status') + if not result or not isinstance(result, dict): + return {"idle": True, "inflight": 0} + return result + + def wait_io_pool_idle(self, timeout: float = 60.0, poll_interval: float = 0.2) -> bool: + """ + Blocks until the controller's io_pool reports idle=True or timeout. + Returns True on idle, False on timeout. Use this to ensure prior + tests' background work has completed before submitting new work. + [C: tests/test_live_workflow.py:test_full_live_workflow] + """ + start = time.time() + while time.time() - start < timeout: + status = self.get_io_pool_status() + if status.get("idle"): + return True + time.sleep(poll_interval) + return False + def post_project(self, project_data: dict) -> dict[str, Any]: return last_status diff --git a/src/api_hooks.py b/src/api_hooks.py index a65a856f..44906e54 100644 --- a/src/api_hooks.py +++ b/src/api_hooks.py @@ -134,6 +134,17 @@ class HookHandler(BaseHTTPRequestHandler): "error": getattr(controller, "_project_switch_error", None), } self.wfile.write(json.dumps(payload).encode("utf-8")) + elif self.path == "/api/io_pool_status": + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + controller = _get_app_attr(app, "controller", None) + if controller is None: + payload = {"idle": True, "inflight": 0} + else: + inflight = getattr(controller, "_io_pool_inflight", 0) + payload = {"idle": inflight == 0, "inflight": inflight} + self.wfile.write(json.dumps(payload).encode("utf-8")) elif self.path == "/api/session": self.send_response(200) self.send_header("Content-Type", "application/json") diff --git a/src/app_controller.py b/src/app_controller.py index 2dadf19f..ae1f8d71 100644 --- a/src/app_controller.py +++ b/src/app_controller.py @@ -2263,13 +2263,51 @@ class AppController: at 4 workers (see src/io_pool.py) so the job may queue briefly if the pool is saturated. + The number of in-flight (running or queued) jobs is tracked via + self._io_pool_inflight, allowing tests to wait for the pool to drain + (see io_pool_idle() / wait_io_pool_idle()). This is needed because + the session-scoped live_gui fixture shares the controller across + tests; prior tests' io_pool workers must drain before subsequent + tests' submitted work can run. + Domain-specific threads (HookServer, WebSocketServer, MMA WorkerPool, asyncio loop) are NOT submitted here - they have their own lifecycle management. [SDM: src/app_controller.py:submit_io] """ - import concurrent.futures - return self._io_pool.submit(fn, *args, **kwargs) + if not hasattr(self, "_io_pool_inflight_lock"): + self._io_pool_inflight_lock = threading.Lock() + with self._io_pool_inflight_lock: + self._io_pool_inflight = getattr(self, "_io_pool_inflight", 0) + 1 + future = self._io_pool.submit(fn, *args, **kwargs) + future.add_done_callback(lambda _f: self._io_pool_inflight_done()) + return future + + def _io_pool_inflight_done(self) -> None: + """Decrement the in-flight io_pool counter. Called by future callback.""" + with self._io_pool_inflight_lock: + if getattr(self, "_io_pool_inflight", 0) > 0: + self._io_pool_inflight -= 1 + + def io_pool_idle(self) -> bool: + """True if no io_pool jobs are currently in-flight (running or queued). + + Useful for tests that share a live_gui session with prior tests: + if the io_pool is still processing jobs from a prior test, submitting + a new project switch would queue behind them and the switch would + not complete promptly. + [C: tests/test_live_workflow.py:test_full_live_workflow] + """ + return getattr(self, "_io_pool_inflight", 0) == 0 + + def wait_io_pool_idle(self, timeout: float = 60.0, poll_interval: float = 0.1) -> bool: + """Blocks until io_pool_idle() is True or timeout. Returns True on idle.""" + start = time.time() + while time.time() - start < timeout: + if self.io_pool_idle(): + return True + time.sleep(poll_interval) + return False def shutdown(self) -> None: """ diff --git a/src/io_pool.py b/src/io_pool.py index f05395c1..ac46afa8 100644 --- a/src/io_pool.py +++ b/src/io_pool.py @@ -17,7 +17,7 @@ the broken finalization chain. See commit log for details. from concurrent.futures import ThreadPoolExecutor -IO_POOL_MAX_WORKERS: int = 4 +IO_POOL_MAX_WORKERS: int = 8 IO_POOL_THREAD_NAME_PREFIX: str = "controller-io" diff --git a/tests/test_api_hook_client_io_pool.py b/tests/test_api_hook_client_io_pool.py new file mode 100644 index 00000000..f3bee620 --- /dev/null +++ b/tests/test_api_hook_client_io_pool.py @@ -0,0 +1,76 @@ +"""Tests for ApiHookClient.get_io_pool_status and wait_io_pool_idle. + +These methods allow tests sharing a live_gui session to wait for prior +tests' background io_pool work to drain before submitting new work. + +ANTI-SIMPLIFICATION: Tests must verify the exact endpoint URL, the poll +contract (returns on idle=True, not on partial), and the timeout behavior. +""" +import pytest +from unittest.mock import patch +import sys +import os + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from src.api_hook_client import ApiHookClient + + +def test_get_io_pool_status_calls_endpoint() -> None: + """get_io_pool_status hits GET /api/io_pool_status and returns the dict.""" + client = ApiHookClient() + with patch.object(client, "_make_request") as mock_make: + mock_make.return_value = {"idle": True, "inflight": 0} + status = client.get_io_pool_status() + assert status == {"idle": True, "inflight": 0} + mock_make.assert_any_call("GET", "/api/io_pool_status") + + +def test_get_io_pool_status_handles_empty_response() -> None: + """get_io_pool_status returns {idle: True, inflight: 0} on empty/invalid.""" + client = ApiHookClient() + with patch.object(client, "_make_request") as mock_make: + mock_make.return_value = None + status = client.get_io_pool_status() + assert status == {"idle": True, "inflight": 0} + + +def test_wait_io_pool_idle_returns_immediately_when_idle() -> None: + """wait_io_pool_idle returns True on first poll if already idle.""" + client = ApiHookClient() + with patch.object(client, "get_io_pool_status") as mock_status: + mock_status.return_value = {"idle": True, "inflight": 0} + result = client.wait_io_pool_idle(timeout=5.0) + assert result is True + assert mock_status.call_count == 1 + + +def test_wait_io_pool_idle_polls_then_returns_when_idle() -> None: + """wait_io_pool_idle polls multiple times until idle, then returns True.""" + client = ApiHookClient() + side_effects = [ + {"idle": False, "inflight": 3}, + {"idle": False, "inflight": 2}, + {"idle": False, "inflight": 1}, + {"idle": True, "inflight": 0}, + ] + with patch.object(client, "get_io_pool_status") as mock_status: + mock_status.side_effect = side_effects + with patch("time.sleep") as mock_sleep: + result = client.wait_io_pool_idle(timeout=10.0, poll_interval=0.1) + assert result is True + assert mock_status.call_count == 4 + assert mock_sleep.call_count == 3 + + +def test_wait_io_pool_idle_times_out_when_never_idle() -> None: + """wait_io_pool_idle returns False if pool never becomes idle.""" + client = ApiHookClient() + with patch.object(client, "get_io_pool_status") as mock_status: + mock_status.return_value = {"idle": False, "inflight": 5} + with patch("time.time") as mock_time: + # Simulate time progressing past the timeout + mock_time.side_effect = [0.0, 0.1, 0.2, 0.3, 100.0, 100.1] + with patch("time.sleep") as mock_sleep: + result = client.wait_io_pool_idle(timeout=5.0, poll_interval=0.1) + assert result is False diff --git a/tests/test_live_workflow.py b/tests/test_live_workflow.py index 4808b06d..db943870 100644 --- a/tests/test_live_workflow.py +++ b/tests/test_live_workflow.py @@ -41,12 +41,16 @@ def test_full_live_workflow(live_gui) -> None: assert client.wait_for_server(timeout=10) client.post_session(session_entries=[]) - # 0. Wait for any in-flight project switch to complete before starting. - # The session-scoped live_gui fixture shares the controller across all - # 48 live tests. Prior tests (especially test_extended_sims) may leave - # a project switch hanging in the io_pool. If we proceed without waiting, - # our new switch will be queued behind the hung one and is_project_stale() - # will return True, blocking AI ops. + # 0a. Wait for app warmup to complete. The warmup submits heavy-module + # import jobs directly to the io_pool (bypassing submit_io's counter); + # we wait for the warmup done event so SDK modules are guaranteed loaded + # before AI ops. + warmup_result = client.get_warmup_wait(timeout=60.0) + print(f"[TEST] Warmup result: {warmup_result}") + + # 0b. Wait for any in-flight project switch to complete before starting. + # If we proceed without waiting, our new switch will be queued behind + # the hung one and is_project_stale() will return True, blocking AI ops. pre_status = client.get_project_switch_status() if pre_status.get("in_progress"): print(f"\n[TEST] Waiting for prior project switch to complete: {pre_status}")