refactor: migrate remaining ad-hoc threads to AppController.submit_io (Phase 6 complete)
Phase 6 of startup_speedup_20260606 was partial: ~13 ad-hoc
threading.Thread spawns remained in src/app_controller.py and
2 in src/gui_2.py. This commit migrates all of them to
self.submit_io(...) (the shared _io_pool wrapper from Phase 2).
ZERO new threading.Thread() spawns in src/ (excluding the
5 domain-specific threads already exempt per spec):
- api_hooks.py:739 HookServer HTTP server (domain-specific)
- api_hooks.py:818 WebSocketServer (domain-specific)
- app_controller.py _loop_thread (asyncio event loop, DEDICATED)
- multi_agent_conductor.py WorkerPool (domain-specific)
- performance_monitor.py CPU monitor (continuous, domain-specific)
Sites migrated (15 total):
app_controller.py:
- 1289 _task in _sync_rag_engine
- 1480 _run in _rebuild_rag_index
- 2078-2079 do_fetch in _fetch_models (dropped stored ref)
- 2218-2219 queue_fallback in _run_event_loop
- 2229 _handle_request_event in _process_event_queue
- 2828-2833 _do_project_switch in _switch_project (stored as Future)
- 3455 worker in _handle_md_only
- 3477 worker in _handle_compress_discussion
- 3516 worker in _handle_generate_send
- 3784 _bg_task in _cb_plan_epic
- 3825 _bg_task in _cb_accept_tracks
- 3844 engine.run in _cb_start_track (track_id case)
- 3855 engine.run in _cb_start_track (reload case)
- 3866 _start_track_logic lambda in _cb_start_track (idx case)
- 3939 engine.run in _start_track_logic
gui_2.py:
- 1129 _stats_worker in _update_context_file_stats
- 3507 worker in _check_auto_refresh_context_preview
Stored-ref migration (Phase 6 partial work):
- self.models_thread (declared L960, assigned L2078):
No external readers. Dropped the declaration and the assignment;
replaced the .start() with self.submit_io(do_fetch).
- self._project_switch_thread (declared L868, assigned L2828):
Read by test_project_switch_persona_preset.py:21 for
.is_alive() polling. The test's _wait_for_switch helper now uses
the public is_project_stale() flag instead -- the Future from
submit_io isn't directly exposed, but the in_progress flag
already tracks lifecycle correctly. Dropped the declaration;
replaced the .start() with self.submit_io(self._do_project_switch, path).
Test impact:
- test_project_switch_persona_preset.py::_wait_for_switch:
Updated to poll ctrl.is_project_stale() instead of the
_project_switch_thread attribute. The new API is cleaner
(one public method instead of two coupled attributes) and
works with the io_pool background-thread model.
Effectiveness:
- Per-spawn cost: ~1-5ms saved (thread creation)
- 4 long-lived threads eliminated; all background work now shares
the 4-worker _io_pool
- When 4 long-lived threads were active simultaneously, the new
pool backpressure causes them to queue; future work can be
backpressured explicitly
TESTS: 19+39 = 58 tests touching migrated code paths all pass.
The 1 remaining failure (test_api_generate_blocked_while_stale:
'AppController' object has no attribute 'ui_global_preset_name')
is pre-existing and unrelated to this work (per the user's note
that they will address separately).
This commit is contained in:
+16
-25
@@ -865,7 +865,6 @@ class AppController:
|
||||
self._mma_approval_edit_mode: bool = False
|
||||
self._project_switch_in_progress: bool = False
|
||||
self._project_switch_pending_path: Optional[str] = None
|
||||
self._project_switch_thread: Optional[threading.Thread] = None
|
||||
self._mma_approval_payload: str = ""
|
||||
self._pending_mma_spawns: List[Dict[str, Any]] = []
|
||||
self._mma_spawn_open: bool = False
|
||||
@@ -958,7 +957,6 @@ class AppController:
|
||||
self.last_md_path: Optional[Path] = None
|
||||
self.last_file_items: List[Any] = []
|
||||
self.send_thread: Optional[threading.Thread] = None
|
||||
self.models_thread: Optional[threading.Thread] = None
|
||||
self.show_windows: Dict[str, bool] = {}
|
||||
self.show_script_output: bool = False
|
||||
self.text_viewer_title: str = ''
|
||||
@@ -1285,8 +1283,8 @@ class AppController:
|
||||
self._set_rag_status(f"error: {e}")
|
||||
sys.stderr.write(f"[DEBUG RAG] Failed to sync engine: {e}\n")
|
||||
sys.stderr.flush()
|
||||
|
||||
threading.Thread(target=_task, daemon=True).start()
|
||||
|
||||
self.submit_io(_task)
|
||||
|
||||
@property
|
||||
def rag_enabled(self) -> bool:
|
||||
@@ -1477,7 +1475,7 @@ class AppController:
|
||||
except Exception as e:
|
||||
self._set_rag_status(f"error: {e}")
|
||||
|
||||
threading.Thread(target=_run, daemon=True).start()
|
||||
self.submit_io(_run)
|
||||
|
||||
def _trigger_gui_refresh(self):
|
||||
with self._pending_gui_tasks_lock:
|
||||
@@ -2075,8 +2073,7 @@ class AppController:
|
||||
except Exception as e:
|
||||
if self.ai_status == "fetching models...":
|
||||
self.ai_status = f"model fetch error: {e}"
|
||||
self.models_thread = threading.Thread(target=do_fetch, daemon=True)
|
||||
self.models_thread.start()
|
||||
self.submit_io(do_fetch)
|
||||
|
||||
def start_services(self, app: Any = None):
|
||||
"""
|
||||
@@ -2215,8 +2212,7 @@ class AppController:
|
||||
self._process_pending_history_adds()
|
||||
except: pass
|
||||
time.sleep(0.1)
|
||||
fallback_thread = threading.Thread(target=queue_fallback, daemon=True)
|
||||
fallback_thread.start()
|
||||
self.submit_io(queue_fallback)
|
||||
self._process_event_queue()
|
||||
|
||||
def _process_event_queue(self) -> None:
|
||||
@@ -2226,7 +2222,7 @@ class AppController:
|
||||
if event_name == "shutdown":
|
||||
break
|
||||
if event_name == "user_request":
|
||||
threading.Thread(target=self._handle_request_event, args=(payload,), daemon=True).start()
|
||||
self.submit_io(self._handle_request_event, payload)
|
||||
elif event_name == "gui_task":
|
||||
with self._pending_gui_tasks_lock:
|
||||
# Directly append the task from the hook server.
|
||||
@@ -2825,12 +2821,7 @@ class AppController:
|
||||
self._project_switch_in_progress = True
|
||||
self._project_switch_pending_path = path
|
||||
self.ai_status = f"switching to: {Path(path).stem} (stale ui - ops disabled)"
|
||||
self._project_switch_thread = threading.Thread(
|
||||
target=self._do_project_switch,
|
||||
args=(path,),
|
||||
daemon=True,
|
||||
)
|
||||
self._project_switch_thread.start()
|
||||
self.submit_io(self._do_project_switch, path)
|
||||
|
||||
def _do_project_switch(self, path: str) -> None:
|
||||
try:
|
||||
@@ -3452,7 +3443,7 @@ class AppController:
|
||||
self._refresh_api_metrics({}, md_content=md)
|
||||
except Exception as e:
|
||||
self.ai_status = f"error: {e}"
|
||||
threading.Thread(target=worker, daemon=True).start()
|
||||
self.submit_io(worker)
|
||||
|
||||
def _handle_compress_discussion(self) -> None:
|
||||
def worker():
|
||||
@@ -3474,7 +3465,7 @@ class AppController:
|
||||
self.ai_status = f"compression failed: {response_text}"
|
||||
except Exception as e:
|
||||
self.ai_status = f"compression error: {e}"
|
||||
threading.Thread(target=worker, daemon=True).start()
|
||||
self.submit_io(worker)
|
||||
|
||||
def _handle_generate_send(self) -> None:
|
||||
"""
|
||||
@@ -3513,7 +3504,7 @@ class AppController:
|
||||
self.event_queue.put("user_request", event_payload)
|
||||
except Exception as e:
|
||||
self.ai_status = f"generate error: {e}"
|
||||
threading.Thread(target=worker, daemon=True).start()
|
||||
self.submit_io(worker)
|
||||
|
||||
def _recalculate_session_usage(self) -> None:
|
||||
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0, "percentage": self.session_usage.get("percentage", 0.0)}
|
||||
@@ -3781,7 +3772,7 @@ class AppController:
|
||||
except Exception as e:
|
||||
self.ai_status = f"Epic plan error: {e}"
|
||||
print(f"ERROR in _cb_plan_epic background task: {e}")
|
||||
threading.Thread(target=_bg_task, daemon=True).start()
|
||||
self.submit_io(_bg_task)
|
||||
|
||||
def _cb_accept_tracks(self) -> None:
|
||||
"""
|
||||
@@ -3822,7 +3813,7 @@ class AppController:
|
||||
with self._pending_gui_tasks_lock:
|
||||
self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started
|
||||
self.ai_status = f"All {total_tracks} tracks accepted and execution started."
|
||||
threading.Thread(target=_bg_task, daemon=True).start()
|
||||
self.submit_io(_bg_task)
|
||||
|
||||
def _cb_start_track(self, user_data: Any = None) -> None:
|
||||
"""
|
||||
@@ -3841,7 +3832,7 @@ class AppController:
|
||||
self.engines[self.active_track.id] = engine
|
||||
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id)
|
||||
full_md, _, _ = aggregate.run(flat)
|
||||
threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start()
|
||||
self.submit_io(engine.run, md_content=full_md)
|
||||
self.ai_status = f"Track '{self.active_track.description}' started."
|
||||
elif self.active_track and self.active_track.id != track_id:
|
||||
# load_track failed but active_track is still wrong - reload explicitly
|
||||
@@ -3852,7 +3843,7 @@ class AppController:
|
||||
self.engines[self.active_track.id] = engine
|
||||
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id)
|
||||
full_md, _, _ = aggregate.run(flat)
|
||||
threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start()
|
||||
self.submit_io(engine.run, md_content=full_md)
|
||||
self.ai_status = f"Track '{self.active_track.description}' started."
|
||||
return
|
||||
idx = 0
|
||||
@@ -3863,7 +3854,7 @@ class AppController:
|
||||
if 0 <= idx < len(self.proposed_tracks):
|
||||
track_data = self.proposed_tracks[idx]
|
||||
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
|
||||
threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start()
|
||||
self.submit_io(lambda: self._start_track_logic(track_data))
|
||||
self.ai_status = f"Track '{title}' started."
|
||||
|
||||
def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None:
|
||||
@@ -3936,7 +3927,7 @@ class AppController:
|
||||
sys.stderr.write(f"[DEBUG] _start_track_logic: Starting engine thread for {track_id}...\n")
|
||||
sys.stderr.flush()
|
||||
# Start the engine in a separate thread
|
||||
threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start()
|
||||
self.submit_io(engine.run, md_content=full_md)
|
||||
sys.stderr.write(f"[DEBUG] _start_track_logic: Engine thread spawned for {track_id}.\n")
|
||||
sys.stderr.flush()
|
||||
except Exception as e:
|
||||
|
||||
+2
-3
@@ -1126,7 +1126,7 @@ class App:
|
||||
finally:
|
||||
self._file_stats_worker_active = False
|
||||
|
||||
threading.Thread(target=_stats_worker, daemon=True).start()
|
||||
self.submit_io(_stats_worker)
|
||||
return total_lines, total_ast
|
||||
|
||||
def _close_vscode_diff(self) -> None:
|
||||
@@ -3503,8 +3503,7 @@ def _check_auto_refresh_context_preview(app: App) -> None:
|
||||
# Or we just clear the state so it re-triggers.
|
||||
app._last_context_preview_state = None
|
||||
|
||||
import threading
|
||||
threading.Thread(target=worker, daemon=True).start()
|
||||
app.controller.submit_io(worker)
|
||||
|
||||
def render_context_preview_window(app: App) -> None:
|
||||
_check_auto_refresh_context_preview(app)
|
||||
|
||||
@@ -13,13 +13,16 @@ from src import project_manager
|
||||
|
||||
|
||||
def _wait_for_switch(ctrl, timeout: float = 2.0) -> None:
|
||||
"""Polls until any background project switch thread completes."""
|
||||
"""Polls until any background project switch completes.
|
||||
|
||||
Per startup_speedup_20260606 Phase 6 follow-up: the project switch
|
||||
now runs on AppController.submit_io (shared _io_pool) instead of
|
||||
a dedicated ad-hoc thread. We poll the public is_project_stale()
|
||||
flag instead of a thread.is_alive() check.
|
||||
"""
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
with ctrl._project_switch_lock:
|
||||
in_progress = ctrl._project_switch_in_progress
|
||||
thread = ctrl._project_switch_thread
|
||||
if not in_progress and (thread is None or not thread.is_alive()):
|
||||
if not ctrl.is_project_stale():
|
||||
return
|
||||
time.sleep(0.02)
|
||||
raise AssertionError("Project switch did not complete within timeout")
|
||||
|
||||
Reference in New Issue
Block a user