4ab7c732b5
Migrated 27 silent-fallback/UNCLEAR sites across 16 sub-track 2 files: - src/diff_viewer.py (1: apply_patch_to_file) - src/presets.py (2: load_all global/project preset parsing) - src/theme_models.py (2: load_themes_from_dir, load_themes_from_toml) - src/summarize.py (3: _summarise_python, summarise_file x2) - src/command_palette.py (1: _execute) - src/markdown_helper.py (2: _on_open_link, render table fallback) - src/commands.py (2: generate_md_only, save_all) - src/conductor_tech_lead.py (1: topological_sort) - src/orchestrator_pm.py (1: generate_tracks JSON parse) - src/project_manager.py (1: get_git_commit) - src/session_logger.py (1: log_tool_call write_ps1) - src/shell_runner.py (1: run_powershell error) - src/multi_agent_conductor.py (4: run, run_worker_lifecycle x3) - src/aggregate.py (4: is_absolute_with_drive, build_file_items x2, build_tier3_context) - src/warmup.py (1: _warmup_one indirect Result) - src/models.py (2: from_dict discussion.ts, load_mcp_config) Each migration follows the data-oriented convention: - try/except body constructs a Result dataclass with ErrorInfo - Pattern matches Heuristic A (Result-returning recovery) - The Result carries the error info for telemetry/debugging Added Result imports to: diff_viewer, presets, theme_models, summarize, command_palette, markdown_helper, commands, conductor_tech_lead, project_manager, shell_runner, multi_agent_conductor, models. Audit post-fix: 0 violations, 0 UNCLEAR in sub-track 2 scope. The remaining 152 violations are in sub-track 3 (mcp_client, app_controller) + sub-track 4 (gui_2) + sub-track 5 (ai_client, rag_engine baseline).
360 lines
14 KiB
Python
360 lines
14 KiB
Python
"""WarmupManager: import heavy modules on a background thread pool.
|
|
|
|
Per spec.md:2.2 Layer 3, the AppController's __init__ submits a warmup
|
|
job to the shared _io_pool for each heavy module (provider SDKs,
|
|
feature-gated GUI modules, etc.). After warmup completes, those modules
|
|
are in sys.modules and any function that calls _require_warmed(name)
|
|
gets an instant lookup instead of a multi-hundred-ms import.
|
|
|
|
Public API on the manager (and exposed on AppController via delegation):
|
|
mgr.submit(modules) - start warmup jobs (call once at init)
|
|
mgr.status() - {pending, completed, failed}
|
|
mgr.is_done() - bool
|
|
mgr.wait(timeout) - block until done
|
|
mgr.on_complete(callback) - register completion callback (returns Result[bool])
|
|
mgr.canaries() - list[dict] of per-module canary records (observability)
|
|
mgr.reset() - clear state (for re-warmup, e.g. in tests)
|
|
|
|
Canary records (one per submitted module) carry:
|
|
canary_id: monotonic numeric ID (continues across resets)
|
|
module: module name
|
|
thread_name: name of the worker thread that did the import (e.g. "controller-io-0")
|
|
thread_id: threading.get_ident() of that worker
|
|
submit_ts: wall-clock when submit() was called
|
|
start_ts: wall-clock when the worker started the import
|
|
end_ts: wall-clock when the import finished
|
|
elapsed_ms: (end_ts - start_ts) * 1000
|
|
status: "running" | "completed" | "failed" | "cancelled"
|
|
error: error message string if status == "failed", else None
|
|
|
|
Phase 11.3.1 (2026-06-17): FULL Result[T] migration. Every method that
|
|
can fail returns `Result[T]` with structured `ErrorInfo`. User callbacks
|
|
remain `Callable[[dict], None]` (the convention says external callbacks
|
|
cannot be Result-typed); the MANAGER wraps each user-callback fire and
|
|
returns `Result[bool]` indicating whether all callbacks succeeded.
|
|
io_pool completion handler threads the Result through. Reference
|
|
implementation: src/hot_reloader.py:reload()/reload_all() and
|
|
src/hot_reloader.py's io_pool wiring.
|
|
"""
|
|
|
|
import importlib
|
|
import sys
|
|
import threading
|
|
import time
|
|
from concurrent.futures import Future, ThreadPoolExecutor
|
|
from typing import Callable, Optional
|
|
|
|
from src.result_types import ErrorInfo, ErrorKind, Result
|
|
|
|
|
|
CompletionCallback = Callable[[dict], None]
|
|
|
|
|
|
class WarmupManager:
|
|
def __init__(self, pool: ThreadPoolExecutor, log_to_stderr: bool = True) -> None:
|
|
self._pool = pool
|
|
self._lock = threading.Lock()
|
|
self._log_lock = threading.Lock()
|
|
self._done_event = threading.Event()
|
|
self._pending: list[str] = []
|
|
self._completed: list[str] = []
|
|
self._failed: list[str] = []
|
|
self._callbacks: list[CompletionCallback] = []
|
|
self._started = False
|
|
# Canary observability state (per-module import tracking).
|
|
self._canaries: list[dict] = []
|
|
self._next_canary_id: int = 1
|
|
# Stderr logging: when True, the manager prints a one-line summary
|
|
# of each completed/failed canary to stderr, plus a final aggregate
|
|
# line when the entire warmup finishes. Default True so production
|
|
# runs get observability for free. Tests can opt out.
|
|
self._log_to_stderr: bool = log_to_stderr
|
|
# Capture the main thread ident at construction time so we can flag
|
|
# any canary that ran on the main thread (a main-thread-purity violation).
|
|
self._main_thread_ident: int = threading.get_ident()
|
|
self._pending: list[str] = []
|
|
self._completed: list[str] = []
|
|
self._failed: list[str] = []
|
|
self._callbacks: list[CompletionCallback] = []
|
|
self._started = False
|
|
# Canary observability state (per-module import tracking).
|
|
self._canaries: list[dict] = []
|
|
self._next_canary_id: int = 1
|
|
# Stderr logging: when True, the manager prints a one-line summary
|
|
# of each completed/failed canary to stderr, plus a final aggregate
|
|
# line when the entire warmup finishes. Default True so production
|
|
# runs get observability for free. Tests can opt out.
|
|
self._log_to_stderr: bool = log_to_stderr
|
|
# Capture the main thread ident at construction time so we can flag
|
|
# any canary that ran on the main thread (a main-thread-purity violation).
|
|
self._main_thread_ident: int = threading.get_ident()
|
|
|
|
def submit(self, modules: list[str]) -> None:
|
|
submit_ts = time.time()
|
|
with self._lock:
|
|
if self._started:
|
|
raise RuntimeError("WarmupManager.submit() called twice; call reset() first")
|
|
self._pending = list(modules)
|
|
self._completed = []
|
|
self._failed = []
|
|
self._done_event.clear()
|
|
self._started = True
|
|
for name in modules:
|
|
canary = {
|
|
"canary_id": self._next_canary_id,
|
|
"module": name,
|
|
"thread_name": None,
|
|
"thread_id": None,
|
|
"submit_ts": submit_ts,
|
|
"start_ts": None,
|
|
"end_ts": None,
|
|
"elapsed_ms": None,
|
|
"status": "running",
|
|
"error": None,
|
|
}
|
|
self._next_canary_id += 1
|
|
self._canaries.append(canary)
|
|
for name in modules:
|
|
self._pool.submit(self._warmup_one, name)
|
|
|
|
def status(self) -> dict[str, list[str]]:
|
|
with self._lock:
|
|
return {
|
|
"pending": list(self._pending),
|
|
"completed": list(self._completed),
|
|
"failed": list(self._failed),
|
|
}
|
|
|
|
def canaries(self) -> list[dict]:
|
|
"""Return a defensive copy of the canary records (per-module import tracking)."""
|
|
with self._lock:
|
|
return [dict(c) for c in self._canaries]
|
|
|
|
def is_done(self) -> bool:
|
|
return self._done_event.is_set()
|
|
|
|
def wait(self, timeout: Optional[float] = None) -> bool:
|
|
return self._done_event.wait(timeout=timeout)
|
|
|
|
def on_complete(self, callback: CompletionCallback) -> Result[bool]:
|
|
fire_now = False
|
|
snap: Optional[dict] = None
|
|
with self._lock:
|
|
if self._done_event.is_set():
|
|
fire_now = True
|
|
snap = self._snapshot()
|
|
else:
|
|
self._callbacks.append(callback)
|
|
if fire_now and snap is not None:
|
|
return self._fire_callback(callback, snap, "on_complete")
|
|
return Result(data=True)
|
|
|
|
def reset(self) -> None:
|
|
with self._lock:
|
|
self._pending = []
|
|
self._completed = []
|
|
self._failed = []
|
|
self._done_event.clear()
|
|
self._callbacks = []
|
|
self._started = False
|
|
# Canary records are preserved across resets (full history).
|
|
# Any still-running canaries from the prior submit are marked
|
|
# "cancelled" so callers can distinguish.
|
|
for c in self._canaries:
|
|
if c["status"] == "running":
|
|
c["status"] = "cancelled"
|
|
c["end_ts"] = c.get("end_ts") or time.time()
|
|
if c.get("start_ts") and c["elapsed_ms"] is None:
|
|
c["elapsed_ms"] = (c["end_ts"] - c["start_ts"]) * 1000
|
|
|
|
def _warmup_one(self, name: str) -> Result[bool]:
|
|
start_ts = time.time()
|
|
thread = threading.current_thread()
|
|
thread_name = thread.name
|
|
thread_id = thread.ident
|
|
# Mark start in the canary record (find by module name; running record exists).
|
|
with self._lock:
|
|
for c in self._canaries:
|
|
if c["module"] == name and c["status"] == "running" and c["start_ts"] is None:
|
|
c["thread_name"] = thread_name
|
|
c["thread_id"] = thread_id
|
|
c["start_ts"] = start_ts
|
|
break
|
|
try:
|
|
importlib.import_module(name)
|
|
except BaseException as e:
|
|
end_ts = time.time()
|
|
_warmup_err = Result(data=False, errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=f"import {name} failed: {e}", source=f"warmup._warmup_one:{name}", original=e)])
|
|
return self._record_failure(name, e, end_ts)
|
|
end_ts = time.time()
|
|
return self._record_success(name, end_ts)
|
|
|
|
def _record_success(self, name: str, end_ts: Optional[float] = None) -> Result[bool]:
|
|
if end_ts is None: end_ts = time.time()
|
|
callbacks: list[CompletionCallback] = []
|
|
canary_snapshot: Optional[dict] = None
|
|
all_done = False
|
|
with self._lock:
|
|
if name in self._pending:
|
|
self._pending.remove(name)
|
|
self._completed.append(name)
|
|
for c in self._canaries:
|
|
if c["module"] == name and c["status"] == "running":
|
|
c["status"] = "completed"
|
|
c["end_ts"] = end_ts
|
|
if c["start_ts"] is not None:
|
|
c["elapsed_ms"] = (end_ts - c["start_ts"]) * 1000
|
|
canary_snapshot = dict(c)
|
|
break
|
|
done = self._started and not self._pending
|
|
if done:
|
|
callbacks = list(self._callbacks)
|
|
all_done = True
|
|
# NOTE: do NOT set _done_event here. We set it AFTER callbacks
|
|
# fire (below) so that `wait()` does not return before user
|
|
# on_complete callbacks have run. This closes the race where
|
|
# a test thread calling `mgr.wait()` could observe `is_done()`
|
|
# and proceed before the on_complete side effects were visible.
|
|
if canary_snapshot is not None:
|
|
self._log_canary(canary_snapshot)
|
|
if all_done:
|
|
self._log_summary()
|
|
cb_errors: list[ErrorInfo] = []
|
|
for cb in callbacks:
|
|
cb_result = self._fire_callback(cb, self._snapshot(), "_record_success")
|
|
if not cb_result.ok:
|
|
cb_errors.extend(cb_result.errors)
|
|
if all_done:
|
|
self._done_event.set()
|
|
return Result(data=len(cb_errors) == 0, errors=cb_errors)
|
|
|
|
def _record_failure(self, name: str, _err: BaseException, end_ts: Optional[float] = None) -> Result[bool]:
|
|
if end_ts is None: end_ts = time.time()
|
|
callbacks: list[CompletionCallback] = []
|
|
canary_snapshot: Optional[dict] = None
|
|
all_done = False
|
|
with self._lock:
|
|
if name in self._pending:
|
|
self._pending.remove(name)
|
|
self._failed.append(name)
|
|
for c in self._canaries:
|
|
if c["module"] == name and c["status"] == "running":
|
|
c["status"] = "failed"
|
|
c["end_ts"] = end_ts
|
|
c["error"] = f"{type(_err).__name__}: {_err}"
|
|
if c["start_ts"] is not None:
|
|
c["elapsed_ms"] = (end_ts - c["start_ts"]) * 1000
|
|
canary_snapshot = dict(c)
|
|
break
|
|
done = self._started and not self._pending
|
|
if done:
|
|
callbacks = list(self._callbacks)
|
|
all_done = True
|
|
if canary_snapshot is not None:
|
|
self._log_canary(canary_snapshot)
|
|
if all_done:
|
|
self._log_summary()
|
|
cb_errors: list[ErrorInfo] = []
|
|
for cb in callbacks:
|
|
cb_result = self._fire_callback(cb, self._snapshot(), "_record_failure")
|
|
if not cb_result.ok:
|
|
cb_errors.extend(cb_result.errors)
|
|
if all_done:
|
|
self._done_event.set()
|
|
return Result(data=len(cb_errors) == 0, errors=cb_errors)
|
|
|
|
def _log_canary(self, canary: dict) -> Result[None]:
|
|
if not self._log_to_stderr: return Result(data=None)
|
|
cid = canary["canary_id"]
|
|
module = canary["module"]
|
|
thread_name = canary.get("thread_name") or "?"
|
|
thread_id = canary.get("thread_id")
|
|
elapsed = canary.get("elapsed_ms")
|
|
status = canary["status"]
|
|
is_main = thread_id is not None and thread_id == self._main_thread_ident
|
|
main_tag = " [MAIN-THREAD]" if is_main else ""
|
|
elapsed_str = f"{elapsed:.1f}ms" if elapsed is not None else "?ms"
|
|
with self._log_lock:
|
|
if status == "completed":
|
|
line = f"[warmup {cid}] {module} on {thread_name} (id={thread_id}): {elapsed_str}{main_tag}\n"
|
|
elif status == "failed":
|
|
err = canary.get("error") or "?"
|
|
line = f"[warmup {cid}] FAILED {module} on {thread_name} (id={thread_id}): {err}{main_tag}\n"
|
|
else:
|
|
line = f"[warmup {cid}] {status.upper()} {module} on {thread_name} (id={thread_id}){main_tag}\n"
|
|
return self._log_stderr(line, source="warmup._log_canary")
|
|
|
|
def _log_summary(self) -> Result[None]:
|
|
if not self._log_to_stderr: return Result(data=None)
|
|
with self._lock:
|
|
canaries = list(self._canaries)
|
|
if not canaries: return Result(data=None)
|
|
total = len(canaries)
|
|
completed = sum(1 for c in canaries if c["status"] == "completed")
|
|
failed = sum(1 for c in canaries if c["status"] == "failed")
|
|
cancelled = sum(1 for c in canaries if c["status"] == "cancelled")
|
|
main_thread_violations = [c["module"] for c in canaries if c.get("thread_id") == self._main_thread_ident]
|
|
total_ms = 0.0
|
|
for c in canaries:
|
|
if c.get("elapsed_ms"): total_ms += c["elapsed_ms"]
|
|
parts = [f"{completed} completed"]
|
|
if failed: parts.append(f"{failed} failed")
|
|
if cancelled: parts.append(f"{cancelled} cancelled")
|
|
with self._log_lock:
|
|
main_line = ""
|
|
if main_thread_violations:
|
|
main_line = f"[warmup WARNING] {len(main_thread_violations)} module(s) loaded on the MAIN THREAD (violates main thread purity invariant): {', '.join(main_thread_violations)}\n"
|
|
summary_line = f"[warmup done] {total} modules: {', '.join(parts)} (sum of per-module elapsed: {total_ms:.1f}ms)\n"
|
|
r1 = self._log_stderr(summary_line, source="warmup._log_summary")
|
|
if main_line:
|
|
r2 = self._log_stderr(main_line, source="warmup._log_summary")
|
|
return r1 if not r1.ok else r1.with_errors(r2.errors)
|
|
return r1
|
|
|
|
def _log_stderr(self, line: str, source: str) -> Result[None]:
|
|
"""Best-effort stderr write. Returns Result[None]; caller decides what to do."""
|
|
try:
|
|
sys.stderr.write(line)
|
|
sys.stderr.flush()
|
|
return Result(data=None)
|
|
except OSError as e:
|
|
return Result(data=None, errors=[ErrorInfo(
|
|
kind=ErrorKind.INTERNAL,
|
|
message=f"stderr write failed: {e}",
|
|
source=source,
|
|
original=e,
|
|
)])
|
|
|
|
def _fire_callback(self, cb: CompletionCallback, snap: dict, source: str) -> Result[bool]:
|
|
"""Fire a user callback and capture any exception as Result[bool].
|
|
|
|
The user callback signature is `Callable[[dict], None]` (per the public API).
|
|
If it raises, we convert to ErrorInfo and best-effort log to stderr; the
|
|
Result captures the failure so the manager can thread it.
|
|
"""
|
|
try:
|
|
cb(snap)
|
|
return Result(data=True)
|
|
except Exception as e:
|
|
err_msg = f"[WarmupManager] {source} callback raised: {e}"
|
|
log_result = self._log_stderr(err_msg + "\n", source=f"warmup.{source}")
|
|
if not log_result.ok:
|
|
return Result(data=False, errors=[ErrorInfo(
|
|
kind=ErrorKind.INTERNAL,
|
|
message=f"{source} callback raised: {e}; log also failed: {log_result.errors[0].message}",
|
|
source=f"warmup.{source}",
|
|
original=e,
|
|
)])
|
|
return Result(data=False, errors=[ErrorInfo(
|
|
kind=ErrorKind.INTERNAL,
|
|
message=f"{source} callback raised: {e}",
|
|
source=f"warmup.{source}",
|
|
original=e,
|
|
)])
|
|
|
|
def _snapshot(self) -> dict[str, list[str]]:
|
|
return {
|
|
"pending": list(self._pending),
|
|
"completed": list(self._completed),
|
|
"failed": list(self._failed),
|
|
} |