import os import json import time import pytest import tempfile import shutil from pathlib import Path from src.app_controller import AppController from src import models from src.personas import PersonaManager from src import presets, tool_presets from src import project_manager def _wait_for_switch(ctrl, timeout: float = 2.0) -> None: """Polls until any background project switch completes. Per startup_speedup_20260606 Phase 6 follow-up: the project switch now runs on AppController.submit_io (shared _io_pool) instead of a dedicated ad-hoc thread. We poll the public is_project_stale() flag instead of a thread.is_alive() check. """ deadline = time.time() + timeout while time.time() < deadline: if not ctrl.is_project_stale(): return time.sleep(0.02) raise AssertionError("Project switch did not complete within timeout") def _setup_two_projects(tmp_path): project_a_dir = tmp_path / "project_a" project_a_dir.mkdir() project_a_path = project_a_dir / "project.toml" project_a_path.write_text('[project]\nname = "project_a"\nactive_preset = "PresetA"\n') project_b_dir = tmp_path / "project_b" project_b_dir.mkdir() project_b_path = project_b_dir / "project.toml" project_b_path.write_text('[project]\nname = "project_b"\n') (project_a_dir / "project_personas.toml").write_text( '[personas."PersonaA"]\nsystem_prompt = "Project A persona"\npreferred_models = []\n' ) (project_b_dir / "project_personas.toml").write_text( '[personas."PersonaB"]\nsystem_prompt = "Project B persona"\npreferred_models = []\n' ) (project_a_dir / "project_presets.toml").write_text( '[presets."PresetA"]\nsystem_prompt = "Project A preset"\n' ) (project_b_dir / "project_presets.toml").write_text( '[presets."PresetB"]\nsystem_prompt = "Project B preset"\n' ) return project_a_path, project_b_path def test_switch_project_resets_invalid_persona(tmp_path, monkeypatch): project_a_path, project_b_path = _setup_two_projects(tmp_path) ctrl = AppController() monkeypatch.setattr(ctrl, "_rebuild_rag_index", lambda: None) monkeypatch.setattr(ctrl, "_flush_to_project", lambda: None) ctrl.active_project_path = str(project_a_path) ctrl.project = project_manager.load_project(str(project_a_path)) ctrl.preset_manager = presets.PresetManager(Path(project_a_path).parent) ctrl.tool_preset_manager = tool_presets.ToolPresetManager(Path(project_a_path).parent) ctrl.persona_manager = PersonaManager(Path(project_a_path).parent) ctrl._refresh_from_project() assert "PersonaA" in ctrl.personas assert "PersonaB" not in ctrl.personas ctrl.ui_active_persona = "PersonaA" ctrl._switch_project(str(project_b_path)) _wait_for_switch(ctrl) assert "PersonaA" not in ctrl.personas assert "PersonaB" in ctrl.personas assert ctrl.ui_active_persona == "" def test_switch_project_resets_invalid_preset(tmp_path, monkeypatch): project_a_path, project_b_path = _setup_two_projects(tmp_path) ctrl = AppController() monkeypatch.setattr(ctrl, "_rebuild_rag_index", lambda: None) monkeypatch.setattr(ctrl, "_flush_to_project", lambda: None) ctrl.active_project_path = str(project_a_path) ctrl.project = project_manager.load_project(str(project_a_path)) ctrl.preset_manager = presets.PresetManager(Path(project_a_path).parent) ctrl.tool_preset_manager = tool_presets.ToolPresetManager(Path(project_a_path).parent) ctrl.persona_manager = PersonaManager(Path(project_a_path).parent) ctrl._refresh_from_project() assert ctrl.ui_project_preset_name == "PresetA" ctrl._switch_project(str(project_b_path)) _wait_for_switch(ctrl) assert "PresetA" not in ctrl.presets assert "PresetB" in ctrl.presets assert ctrl.ui_project_preset_name is None def test_switch_project_resets_invalid_tool_preset(tmp_path, monkeypatch): project_a_path, project_b_path = _setup_two_projects(tmp_path) (Path(project_a_path).parent / "project_tool_presets.toml").write_text( '[presets."ToolA"]\ndescription = "A"\n[presets.tools."run_powershell"]\nweight = 1\n' ) (Path(project_b_path).parent / "project_tool_presets.toml").write_text( '[presets."ToolB"]\ndescription = "B"\n[presets.tools."run_powershell"]\nweight = 1\n' ) ctrl = AppController() monkeypatch.setattr(ctrl, "_rebuild_rag_index", lambda: None) monkeypatch.setattr(ctrl, "_flush_to_project", lambda: None) ctrl.active_project_path = str(project_a_path) ctrl.project = project_manager.load_project(str(project_a_path)) ctrl.preset_manager = presets.PresetManager(Path(project_a_path).parent) ctrl.tool_preset_manager = tool_presets.ToolPresetManager(Path(project_a_path).parent) ctrl.persona_manager = PersonaManager(Path(project_a_path).parent) ctrl._refresh_from_project() ctrl.ui_active_tool_preset = "ToolA" assert "ToolA" in ctrl.tool_presets ctrl._switch_project(str(project_b_path)) _wait_for_switch(ctrl) assert "ToolA" not in ctrl.tool_presets assert "ToolB" in ctrl.tool_presets assert ctrl.ui_active_tool_preset is None def test_switch_project_preserves_global_preset(tmp_path, monkeypatch): project_a_path, project_b_path = _setup_two_projects(tmp_path) ctrl = AppController() monkeypatch.setattr(ctrl, "_rebuild_rag_index", lambda: None) monkeypatch.setattr(ctrl, "_flush_to_project", lambda: None) ctrl.active_project_path = str(project_a_path) ctrl.project = project_manager.load_project(str(project_a_path)) ctrl.preset_manager = presets.PresetManager(Path(project_a_path).parent) ctrl.tool_preset_manager = tool_presets.ToolPresetManager(Path(project_a_path).parent) ctrl.persona_manager = PersonaManager(Path(project_a_path).parent) ctrl._refresh_from_project() ctrl.ui_global_preset_name = "GlobalPresetA" original_global = ctrl.ui_global_preset_name ctrl._switch_project(str(project_b_path)) _wait_for_switch(ctrl) assert ctrl.ui_global_preset_name == original_global def test_load_active_project_creates_persona_manager(tmp_path, monkeypatch): project_a_path, _ = _setup_two_projects(tmp_path) ctrl = AppController() monkeypatch.setattr(ctrl, "_rebuild_rag_index", lambda: None) monkeypatch.setattr(ctrl, "_flush_to_project", lambda: None) monkeypatch.setattr(ctrl, "_configure_mcp_for_project", lambda: None) assert not hasattr(ctrl, "persona_manager") ctrl.active_project_path = str(project_a_path) ctrl._load_active_project() assert hasattr(ctrl, "persona_manager") assert "PersonaA" in ctrl.personas assert ctrl.ui_active_bias_profile is None or ctrl.ui_active_bias_profile in ctrl.bias_profiles def test_load_context_preset_missing_raises_keyerror(tmp_path, monkeypatch): project_a_path, _ = _setup_two_projects(tmp_path) ctrl = AppController() monkeypatch.setattr(ctrl, "_rebuild_rag_index", lambda: None) monkeypatch.setattr(ctrl, "_flush_to_project", lambda: None) monkeypatch.setattr(ctrl, "_configure_mcp_for_project", lambda: None) ctrl.active_project_path = str(project_a_path) ctrl._load_active_project() with pytest.raises(KeyError, match="Context preset 'NonexistentPreset' not found"): ctrl.load_context_preset("NonexistentPreset") def test_switch_project_resets_context_files(tmp_path, monkeypatch): project_a_path, project_b_path = _setup_two_projects(tmp_path) proj_a_data = '''[project] name = "project_a" [files] base_dir = "." paths = [ { path = "C:/projects/forth/bootslop/main.c", view_mode = "full" }, { path = "C:/projects/Pikuma/ps1/code/gte_hello/hello_gte.c", view_mode = "full" }, ] ''' project_a_path.write_text(proj_a_data) proj_b_data = '''[project] name = "project_b" [files] base_dir = "." paths = [ { path = "C:/projects/gencpp/base/dependencies/timing.cpp", view_mode = "full" }, { path = "C:/projects/gencpp/base/dependencies/timing.hpp", view_mode = "full" }, ] ''' project_b_path.write_text(proj_b_data) ctrl = AppController() monkeypatch.setattr(ctrl, "_rebuild_rag_index", lambda: None) monkeypatch.setattr(ctrl, "_flush_to_project", lambda: None) ctrl.active_project_path = str(project_a_path) ctrl.project = project_manager.load_project(str(project_a_path)) ctrl.preset_manager = presets.PresetManager(Path(project_a_path).parent) ctrl.tool_preset_manager = tool_presets.ToolPresetManager(Path(project_a_path).parent) ctrl.persona_manager = PersonaManager(Path(project_a_path).parent) ctrl._refresh_from_project() assert len(ctrl.context_files) == 2 assert any("forth" in f.path for f in ctrl.context_files) assert any("gte_hello" in f.path for f in ctrl.context_files) ctrl._switch_project(str(project_b_path)) _wait_for_switch(ctrl) assert len(ctrl.context_files) == 2 assert all("gencpp" in f.path for f in ctrl.context_files) assert not any("forth" in f.path for f in ctrl.context_files) assert not any("gte_hello" in f.path for f in ctrl.context_files) def test_switch_project_non_blocking(tmp_path, monkeypatch): """The switch returns immediately; the heavy work runs in a background thread. UI calls see is_project_stale() == True while it's running.""" project_a_path, project_b_path = _setup_two_projects(tmp_path) ctrl = AppController() monkeypatch.setattr(ctrl, "_rebuild_rag_index", lambda: None) monkeypatch.setattr(ctrl, "_flush_to_project", lambda: None) ctrl.active_project_path = str(project_a_path) ctrl.project = project_manager.load_project(str(project_a_path)) ctrl.preset_manager = presets.PresetManager(Path(project_a_path).parent) ctrl.tool_preset_manager = tool_presets.ToolPresetManager(Path(project_a_path).parent) ctrl.persona_manager = PersonaManager(Path(project_a_path).parent) ctrl._refresh_from_project() started = time.time() ctrl._switch_project(str(project_b_path)) elapsed = time.time() - started assert elapsed < 0.2, f"_switch_project blocked the caller for {elapsed:.2f}s" assert ctrl.is_project_stale() _wait_for_switch(ctrl) assert not ctrl.is_project_stale() assert ctrl.active_project_path == str(project_b_path) def test_api_generate_blocked_while_stale(tmp_path, monkeypatch): """_api_generate returns 409 when the project switch is in progress.""" project_a_path, project_b_path = _setup_two_projects(tmp_path) ctrl = AppController() monkeypatch.setattr(ctrl, "_rebuild_rag_index", lambda: None) monkeypatch.setattr(ctrl, "_flush_to_project", lambda: None) ctrl.active_project_path = str(project_a_path) ctrl.project = project_manager.load_project(str(project_a_path)) ctrl.preset_manager = presets.PresetManager(Path(project_a_path).parent) ctrl.tool_preset_manager = tool_presets.ToolPresetManager(Path(project_a_path).parent) ctrl.persona_manager = PersonaManager(Path(project_a_path).parent) ctrl._refresh_from_project() ctrl._switch_project(str(project_b_path)) assert ctrl.is_project_stale() from fastapi import HTTPException from src.app_controller import _api_generate from src.models import GenerateRequest req = GenerateRequest(prompt="hello") with pytest.raises(HTTPException) as exc_info: _api_generate(ctrl, req) assert exc_info.value.status_code == 409 _wait_for_switch(ctrl) assert not ctrl.is_project_stale()