refactor(tests): Update test suite and API hooks for AppController architecture

This commit is contained in:
2026-03-04 11:38:36 -05:00
parent 8642277ef4
commit f2b25757eb
6 changed files with 214 additions and 141 deletions

View File

@@ -7,6 +7,26 @@ from typing import Any
import logging
import session_logger
def _get_app_attr(app: Any, name: str, default: Any = None) -> Any:
if hasattr(app, name):
return getattr(app, name)
if hasattr(app, 'controller') and hasattr(app.controller, name):
return getattr(app.controller, name)
return default
def _has_app_attr(app: Any, name: str) -> bool:
if hasattr(app, name): return True
if hasattr(app, 'controller') and hasattr(app.controller, name): return True
return False
def _set_app_attr(app: Any, name: str, value: Any) -> None:
if hasattr(app, name):
setattr(app, name, value)
elif hasattr(app, 'controller'):
setattr(app.controller, name, value)
else:
setattr(app, name, value)
class HookServerInstance(ThreadingHTTPServer):
"""Custom HTTPServer that carries a reference to the main App instance."""
def __init__(self, server_address: tuple[str, int], RequestHandlerClass: type, app: Any) -> None:
@@ -28,14 +48,19 @@ class HookHandler(BaseHTTPRequestHandler):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
flat = project_manager.flat_config(app.project)
flat = project_manager.flat_config(_get_app_attr(app, 'project'))
self.wfile.write(json.dumps({'project': flat}).encode('utf-8'))
elif self.path == '/api/session':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
with app._disc_entries_lock:
entries_snapshot = list(app.disc_entries)
lock = _get_app_attr(app, '_disc_entries_lock')
entries = _get_app_attr(app, 'disc_entries', [])
if lock:
with lock:
entries_snapshot = list(entries)
else:
entries_snapshot = list(entries)
self.wfile.write(
json.dumps({'session': {'entries': entries_snapshot}}).
encode('utf-8'))
@@ -44,8 +69,9 @@ class HookHandler(BaseHTTPRequestHandler):
self.send_header('Content-Type', 'application/json')
self.end_headers()
metrics = {}
if hasattr(app, 'perf_monitor'):
metrics = app.perf_monitor.get_metrics()
perf = _get_app_attr(app, 'perf_monitor')
if perf:
metrics = perf.get_metrics()
self.wfile.write(json.dumps({'performance': metrics}).encode('utf-8'))
elif self.path == '/api/events':
# Long-poll or return current event queue
@@ -53,10 +79,16 @@ class HookHandler(BaseHTTPRequestHandler):
self.send_header('Content-Type', 'application/json')
self.end_headers()
events = []
if hasattr(app, '_api_event_queue'):
with app._api_event_queue_lock:
events = list(app._api_event_queue)
app._api_event_queue.clear()
if _has_app_attr(app, '_api_event_queue'):
lock = _get_app_attr(app, '_api_event_queue_lock')
queue = _get_app_attr(app, '_api_event_queue')
if lock:
with lock:
events = list(queue)
queue.clear()
else:
events = list(queue)
queue.clear()
self.wfile.write(json.dumps({'events': events}).encode('utf-8'))
elif self.path == '/api/gui/value':
# POST with {"field": "field_tag"} to get value
@@ -69,17 +101,20 @@ class HookHandler(BaseHTTPRequestHandler):
def get_val():
try:
if field_tag in app._settable_fields:
attr = app._settable_fields[field_tag]
val = getattr(app, attr, None)
result["value"] = val
settable = _get_app_attr(app, '_settable_fields', {})
if field_tag in settable:
attr = settable[field_tag]
result["value"] = _get_app_attr(app, attr, None)
finally:
event.set()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({
"action": "custom_callback",
"callback": get_val
})
lock = _get_app_attr(app, '_pending_gui_tasks_lock')
tasks = _get_app_attr(app, '_pending_gui_tasks')
if lock and tasks is not None:
with lock:
tasks.append({
"action": "custom_callback",
"callback": get_val
})
if event.wait(timeout=60):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
@@ -96,16 +131,20 @@ class HookHandler(BaseHTTPRequestHandler):
def get_val():
try:
if field_tag in app._settable_fields:
attr = app._settable_fields[field_tag]
result["value"] = getattr(app, attr, None)
settable = _get_app_attr(app, '_settable_fields', {})
if field_tag in settable:
attr = settable[field_tag]
result["value"] = _get_app_attr(app, attr, None)
finally:
event.set()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({
"action": "custom_callback",
"callback": get_val
})
lock = _get_app_attr(app, '_pending_gui_tasks_lock')
tasks = _get_app_attr(app, '_pending_gui_tasks')
if lock and tasks is not None:
with lock:
tasks.append({
"action": "custom_callback",
"callback": get_val
})
if event.wait(timeout=60):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
@@ -120,30 +159,33 @@ class HookHandler(BaseHTTPRequestHandler):
def get_mma():
try:
result["mma_status"] = getattr(app, "mma_status", "idle")
result["ai_status"] = getattr(app, "ai_status", "idle")
result["active_tier"] = getattr(app, "active_tier", None)
at = getattr(app, "active_track", None)
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
result["active_tier"] = _get_app_attr(app, "active_tier", None)
at = _get_app_attr(app, "active_track", None)
result["active_track"] = at.id if hasattr(at, "id") else at
result["active_tickets"] = getattr(app, "active_tickets", [])
result["mma_step_mode"] = getattr(app, "mma_step_mode", False)
result["pending_tool_approval"] = getattr(app, "_pending_ask_dialog", False)
result["pending_script_approval"] = getattr(app, "_pending_dialog", None) is not None
result["pending_mma_step_approval"] = getattr(app, "_pending_mma_approval", None) is not None
result["pending_mma_spawn_approval"] = getattr(app, "_pending_mma_spawn", None) is not None
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
result["pending_spawn"] = result["pending_mma_spawn_approval"]
result["tracks"] = getattr(app, "tracks", [])
result["proposed_tracks"] = getattr(app, "proposed_tracks", [])
result["mma_streams"] = getattr(app, "mma_streams", {})
result["mma_tier_usage"] = getattr(app, "mma_tier_usage", {})
result["tracks"] = _get_app_attr(app, "tracks", [])
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
finally:
event.set()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({
"action": "custom_callback",
"callback": get_mma
})
lock = _get_app_attr(app, '_pending_gui_tasks_lock')
tasks = _get_app_attr(app, '_pending_gui_tasks')
if lock and tasks is not None:
with lock:
tasks.append({
"action": "custom_callback",
"callback": get_mma
})
if event.wait(timeout=60):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
@@ -158,17 +200,20 @@ class HookHandler(BaseHTTPRequestHandler):
def check_all():
try:
status = getattr(app, "ai_status", "idle")
status = _get_app_attr(app, "ai_status", "idle")
result["thinking"] = status in ["sending...", "running powershell..."]
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
result["prior"] = getattr(app, "is_viewing_prior_session", False)
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
finally:
event.set()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({
"action": "custom_callback",
"callback": check_all
})
lock = _get_app_attr(app, '_pending_gui_tasks_lock')
tasks = _get_app_attr(app, '_pending_gui_tasks')
if lock and tasks is not None:
with lock:
tasks.append({
"action": "custom_callback",
"callback": check_all
})
if event.wait(timeout=60):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
@@ -191,7 +236,8 @@ class HookHandler(BaseHTTPRequestHandler):
try:
data = json.loads(body_str) if body_str else {}
if self.path == '/api/project':
app.project = data.get('project', app.project)
project = _get_app_attr(app, 'project')
_set_app_attr(app, 'project', data.get('project', project))
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
@@ -199,8 +245,9 @@ class HookHandler(BaseHTTPRequestHandler):
elif self.path.startswith('/api/confirm/'):
action_id = self.path.split('/')[-1]
approved = data.get('approved', False)
if hasattr(app, 'resolve_pending_action'):
success = app.resolve_pending_action(action_id, approved)
resolve_func = _get_app_attr(app, 'resolve_pending_action')
if resolve_func:
success = resolve_func(action_id, approved)
if success:
self.send_response(200)
self.send_header('Content-Type', 'application/json')
@@ -213,15 +260,24 @@ class HookHandler(BaseHTTPRequestHandler):
self.send_response(500)
self.end_headers()
elif self.path == '/api/session':
with app._disc_entries_lock:
app.disc_entries = data.get('session', {}).get('entries', app.disc_entries)
lock = _get_app_attr(app, '_disc_entries_lock')
entries = _get_app_attr(app, 'disc_entries')
new_entries = data.get('session', {}).get('entries', entries)
if lock:
with lock:
_set_app_attr(app, 'disc_entries', new_entries)
else:
_set_app_attr(app, 'disc_entries', new_entries)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'updated'}).encode('utf-8'))
elif self.path == '/api/gui':
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append(data)
lock = _get_app_attr(app, '_pending_gui_tasks_lock')
tasks = _get_app_attr(app, '_pending_gui_tasks')
if lock and tasks is not None:
with lock:
tasks.append(data)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
@@ -229,35 +285,65 @@ class HookHandler(BaseHTTPRequestHandler):
elif self.path == '/api/ask':
request_id = str(uuid.uuid4())
event = threading.Event()
if not hasattr(app, '_pending_asks'): app._pending_asks = {}
if not hasattr(app, '_ask_responses'): app._ask_responses = {}
app._pending_asks[request_id] = event
with app._api_event_queue_lock:
app._api_event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
pending_asks = _get_app_attr(app, '_pending_asks')
if pending_asks is None:
pending_asks = {}
_set_app_attr(app, '_pending_asks', pending_asks)
ask_responses = _get_app_attr(app, '_ask_responses')
if ask_responses is None:
ask_responses = {}
_set_app_attr(app, '_ask_responses', ask_responses)
pending_asks[request_id] = event
event_queue_lock = _get_app_attr(app, '_api_event_queue_lock')
event_queue = _get_app_attr(app, '_api_event_queue')
if event_queue is not None:
if event_queue_lock:
with event_queue_lock:
event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
else:
event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
gui_tasks_lock = _get_app_attr(app, '_pending_gui_tasks_lock')
gui_tasks = _get_app_attr(app, '_pending_gui_tasks')
if gui_tasks is not None:
if gui_tasks_lock:
with gui_tasks_lock:
gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
else:
gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
if event.wait(timeout=60.0):
response_data = app._ask_responses.get(request_id)
if request_id in app._ask_responses: del app._ask_responses[request_id]
response_data = ask_responses.get(request_id)
if request_id in ask_responses: del ask_responses[request_id]
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'ok', 'response': response_data}).encode('utf-8'))
else:
if request_id in app._pending_asks: del app._pending_asks[request_id]
if request_id in pending_asks: del pending_asks[request_id]
self.send_response(504)
self.end_headers()
self.wfile.write(json.dumps({'error': 'timeout'}).encode('utf-8'))
elif self.path == '/api/ask/respond':
request_id = data.get('request_id')
response_data = data.get('response')
if request_id and hasattr(app, '_pending_asks') and request_id in app._pending_asks:
app._ask_responses[request_id] = response_data
event = app._pending_asks[request_id]
pending_asks = _get_app_attr(app, '_pending_asks')
ask_responses = _get_app_attr(app, '_ask_responses')
if request_id and pending_asks and request_id in pending_asks:
ask_responses[request_id] = response_data
event = pending_asks[request_id]
event.set()
del app._pending_asks[request_id]
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({"action": "clear_ask", "request_id": request_id})
del pending_asks[request_id]
gui_tasks_lock = _get_app_attr(app, '_pending_gui_tasks_lock')
gui_tasks = _get_app_attr(app, '_pending_gui_tasks')
if gui_tasks is not None:
if gui_tasks_lock:
with gui_tasks_lock:
gui_tasks.append({"action": "clear_ask", "request_id": request_id})
else:
gui_tasks.append({"action": "clear_ask", "request_id": request_id})
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
@@ -274,8 +360,8 @@ class HookHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(json.dumps({'error': str(e)}).encode('utf-8'))
def log_message(self, format: str, *args: Any) -> None:
logging.info("Hook API: " + format % args)
def log_message(self, format: str, *args: Any) -> None:
logging.info("Hook API: " + format % args)
class HookServer:
def __init__(self, app: Any, port: int = 8999) -> None:
@@ -287,15 +373,15 @@ class HookServer:
def start(self) -> None:
if self.thread and self.thread.is_alive():
return
is_gemini_cli = getattr(self.app, 'current_provider', '') == 'gemini_cli'
if not getattr(self.app, 'test_hooks_enabled', False) and not is_gemini_cli:
is_gemini_cli = _get_app_attr(self.app, 'current_provider', '') == 'gemini_cli'
if not _get_app_attr(self.app, 'test_hooks_enabled', False) and not is_gemini_cli:
return
if not hasattr(self.app, '_pending_gui_tasks'): self.app._pending_gui_tasks = []
if not hasattr(self.app, '_pending_gui_tasks_lock'): self.app._pending_gui_tasks_lock = threading.Lock()
if not hasattr(self.app, '_pending_asks'): self.app._pending_asks = {}
if not hasattr(self.app, '_ask_responses'): self.app._ask_responses = {}
if not hasattr(self.app, '_api_event_queue'): self.app._api_event_queue = []
if not hasattr(self.app, '_api_event_queue_lock'): self.app._api_event_queue_lock = threading.Lock()
if not _has_app_attr(self.app, '_pending_gui_tasks'): _set_app_attr(self.app, '_pending_gui_tasks', [])
if not _has_app_attr(self.app, '_pending_gui_tasks_lock'): _set_app_attr(self.app, '_pending_gui_tasks_lock', threading.Lock())
if not _has_app_attr(self.app, '_pending_asks'): _set_app_attr(self.app, '_pending_asks', {})
if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {})
if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', [])
if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock())
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()

View File

@@ -160,6 +160,7 @@ class AppController:
self.perf_monitor: PerformanceMonitor = PerformanceMonitor()
self._pending_gui_tasks: List[Dict[str, Any]] = []
self._api_event_queue: List[Dict[str, Any]] = []
# Pending dialogs state moved from App
self._pending_dialog: Optional[ConfirmDialog] = None
@@ -409,15 +410,15 @@ class AppController:
self.models_thread = threading.Thread(target=do_fetch, daemon=True)
self.models_thread.start()
def start_services(self):
def start_services(self, app: Any = None):
"""Starts background threads and async event loop."""
self._prune_old_logs()
self._init_ai_and_hooks()
self._init_ai_and_hooks(app)
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
def _init_ai_and_hooks(self) -> None:
def _init_ai_and_hooks(self, app: Any = None) -> None:
import api_hooks
ai_client.set_provider(self._current_provider, self._current_model)
if self._current_provider == "gemini_cli":
@@ -460,7 +461,7 @@ class AppController:
'manual_approve': 'ui_manual_approve'
}
self.hook_server = api_hooks.HookServer(self)
self.hook_server = api_hooks.HookServer(app if app else self)
self.hook_server.start()
def _run_event_loop(self):

View File

@@ -104,7 +104,7 @@ class App:
# Initialize controller and delegate state
self.controller = AppController()
self.controller.init_state()
self.controller.start_services()
self.controller.start_services(self)
# Aliases for controller-owned locks
self._send_thread_lock = self.controller._send_thread_lock
@@ -177,30 +177,6 @@ class App:
"""UI-level wrapper for approving a pending MMA sub-agent spawn."""
self._handle_mma_respond(approved=True)
def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None:
"""Delegates MMA approval response to the controller."""
self.controller._handle_mma_respond(approved, payload, abort, prompt, context_md)
def _handle_approve_ask(self, user_data=None) -> None:
"""Delegates tool approval to the controller."""
self.controller._handle_approve_ask()
def _handle_reject_ask(self, user_data=None) -> None:
"""Delegates tool rejection to the controller."""
self.controller._handle_reject_ask()
def _handle_reset_session(self, user_data=None) -> None:
"""Delegates session reset to the controller."""
self.controller._handle_reset_session()
def _handle_md_only(self, user_data=None) -> None:
"""Delegates 'MD Only' logic to the controller."""
self.controller._handle_md_only()
def _handle_generate_send(self, user_data=None) -> None:
"""Delegates 'Gen + Send' logic to the controller."""
self.controller._handle_generate_send()
def __getattr__(self, name: str) -> Any:
if name != 'controller' and hasattr(self, 'controller') and hasattr(self.controller, name):
return getattr(self.controller, name)
@@ -508,6 +484,7 @@ class App:
self.perf_monitor.start_frame()
# Process GUI task queue
self._process_pending_gui_tasks()
self._process_pending_history_adds()
self._render_track_proposal_modal()
# Auto-save (every 60s)
now = time.time()

View File

@@ -88,7 +88,7 @@ def mock_app() -> Generator[App, None, None]:
Mock version of the App for simple unit tests that don't need a loop.
"""
with (
patch('gui_2.load_config', return_value={
patch('src.models.load_config', return_value={
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
'projects': {'paths': [], 'active': ''},
'gui': {'show_windows': {}}
@@ -97,16 +97,19 @@ def mock_app() -> Generator[App, None, None]:
patch('gui_2.project_manager'),
patch('gui_2.session_logger'),
patch('gui_2.immapp.run'),
patch.object(App, '_load_active_project'),
patch.object(App, '_fetch_models'),
patch('src.app_controller.AppController._load_active_project'),
patch('src.app_controller.AppController._fetch_models'),
patch.object(App, '_load_fonts'),
patch.object(App, '_post_init'),
patch.object(App, '_prune_old_logs'),
patch.object(App, '_init_ai_and_hooks'),
patch('src.app_controller.AppController._prune_old_logs'),
patch('src.app_controller.AppController.start_services'),
patch('src.app_controller.AppController._init_ai_and_hooks'),
patch('gui_2.PerformanceMonitor')
):
app = App()
yield app
if hasattr(app, 'controller'):
app.controller.stop_services()
if hasattr(app, 'shutdown'):
app.shutdown()
@@ -117,7 +120,7 @@ def app_instance() -> Generator[App, None, None]:
Matches the pattern used in test_token_viz.py and test_gui_phase4.py.
"""
with (
patch('gui_2.load_config', return_value={
patch('src.models.load_config', return_value={
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
'projects': {'paths': [], 'active': ''},
'gui': {'show_windows': {}}
@@ -126,22 +129,28 @@ def app_instance() -> Generator[App, None, None]:
patch('gui_2.project_manager'),
patch('gui_2.session_logger'),
patch('gui_2.immapp.run'),
patch.object(App, '_load_active_project'),
patch.object(App, '_fetch_models'),
patch('src.app_controller.AppController._load_active_project'),
patch('src.app_controller.AppController._fetch_models'),
patch.object(App, '_load_fonts'),
patch.object(App, '_post_init'),
patch.object(App, '_prune_old_logs'),
patch.object(App, '_init_ai_and_hooks'),
patch('src.app_controller.AppController._prune_old_logs'),
patch('src.app_controller.AppController.start_services'),
patch('src.app_controller.AppController._init_ai_and_hooks'),
patch('gui_2.PerformanceMonitor')
):
app = App()
yield app
# Cleanup: Ensure background threads and asyncio loop are stopped
if hasattr(app, 'controller'):
app.controller.stop_services()
if hasattr(app, 'shutdown'):
app.shutdown()
if hasattr(app, '_loop') and not app._loop.is_closed():
tasks = [t for t in asyncio.all_tasks(app._loop) if not t.done()]
# Use controller._loop for cleanup
loop = getattr(app.controller, '_loop', None) if hasattr(app, 'controller') else None
if loop and not loop.is_closed():
tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]
if tasks:
# Cancel tasks so they can be gathered
for task in tasks:
@@ -149,14 +158,14 @@ def app_instance() -> Generator[App, None, None]:
# We can't really run the loop if it's already stopping or thread is dead,
# but we try to be clean.
try:
if app._loop.is_running():
app._loop.call_soon_threadsafe(app._loop.stop)
if loop.is_running():
loop.call_soon_threadsafe(loop.stop)
except: pass
# Finally close the loop if we can
try:
if not app._loop.is_running():
app._loop.close()
if not loop.is_running():
loop.close()
except: pass
@pytest.fixture(scope="session")

View File

@@ -1,12 +1,12 @@
import pytest
from unittest.mock import MagicMock, patch
from gui_2 import App
from events import UserRequestEvent
from src.events import UserRequestEvent
@pytest.fixture
def mock_gui() -> App:
with (
patch('gui_2.load_config', return_value={
patch('src.models.load_config', return_value={
"ai": {"provider": "gemini", "model": "model-1"},
"projects": {"paths": [], "active": ""},
"gui": {"show_windows": {}}
@@ -15,8 +15,8 @@ def mock_gui() -> App:
patch('gui_2.project_manager.migrate_from_legacy_config', return_value={}),
patch('gui_2.project_manager.save_project'),
patch('gui_2.session_logger.open_session'),
patch('gui_2.App._init_ai_and_hooks'),
patch('gui_2.App._fetch_models')
patch('src.app_controller.AppController._init_ai_and_hooks'),
patch('src.app_controller.AppController._fetch_models')
):
gui = App()
return gui

View File

@@ -43,7 +43,7 @@ def test_add_ticket_logic(mock_app: App):
mock_imgui.get_window_draw_list.return_value.add_rect_filled = MagicMock()
# We also need to mock _push_mma_state_update
with patch.object(mock_app, '_push_mma_state_update') as mock_push:
with patch.object(mock_app.controller, '_push_mma_state_update') as mock_push:
mock_app._render_mma_dashboard()
# Verify ticket was added
@@ -93,7 +93,7 @@ def test_delete_ticket_logic(mock_app: App):
mock_imgui.ImVec2 = MagicMock
mock_imgui.ImVec4 = MagicMock
with patch('gui_2.C_LBL', MagicMock()), patch.object(mock_app, '_push_mma_state_update') as mock_push:
with patch('gui_2.C_LBL', MagicMock()), patch.object(mock_app.controller, '_push_mma_state_update') as mock_push:
# Render T-001
mock_app._render_ticket_dag_node(mock_app.active_tickets[0], tickets_by_id, children_map, rendered)
@@ -110,8 +110,8 @@ def test_track_discussion_toggle(mock_app: App):
patch('gui_2.imgui') as mock_imgui,
patch('gui_2.project_manager.load_track_history', return_value=["@2026-03-01 12:00:00\n[User]\nTrack Hello"]) as mock_load,
patch('gui_2.project_manager.str_to_entry', side_effect=lambda s, roles: {"ts": "12:00", "role": "User", "content": s.split("\n")[-1]}),
patch.object(mock_app, '_flush_disc_entries_to_project') as mock_flush,
patch.object(mock_app, '_switch_discussion') as mock_switch
patch.object(mock_app.controller, '_flush_disc_entries_to_project') as mock_flush,
patch.object(mock_app.controller, '_switch_discussion') as mock_switch
):
# Track calls to ensure we only return 'changed=True' once to avoid loops
calls = {"Track Discussion": 0}
@@ -163,9 +163,9 @@ def test_track_discussion_toggle(mock_app: App):
def test_push_mma_state_update(mock_app: App):
mock_app.active_tickets = [{"id": "T-001", "description": "desc", "status": "todo", "assigned_to": "tier3-worker", "depends_on": []}]
with patch('project_manager.save_track_state') as mock_save, \
patch('project_manager.load_track_state', return_value=None):
mock_app._push_mma_state_update()
with patch('src.project_manager.save_track_state') as mock_save, \
patch('src.project_manager.load_track_state', return_value=None):
mock_app.controller._push_mma_state_update()
assert len(mock_app.active_track.tickets) == 1
assert mock_app.active_track.tickets[0].id == "T-001"