Private
Public Access
0
0

more org of app controller

This commit is contained in:
2026-06-07 02:14:06 -04:00
parent 285b1d3542
commit b3931948cc
+437 -429
View File
@@ -1526,25 +1526,25 @@ class AppController:
def _init_actions(self) -> None:
# Set up state-related action maps
self._clickable_actions: dict[str, Callable[..., Any]] = {
'btn_reset': self._handle_reset_session,
'btn_gen_send': self._handle_generate_send,
'btn_md_only': self._handle_md_only,
'btn_approve_script': self._handle_approve_script,
'btn_reject_script': self._handle_reject_script,
'btn_project_save': self._cb_project_save,
'btn_disc_create': self._cb_disc_create,
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type),
'btn_approve_tool': self._handle_approve_ask,
'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True),
'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True),
'btn_prune_logs': self.cb_prune_logs,
'btn_reset_base_prompt': self._cb_reset_base_prompt,
'btn_reset': self._handle_reset_session,
'btn_gen_send': self._handle_generate_send,
'btn_md_only': self._handle_md_only,
'btn_approve_script': self._handle_approve_script,
'btn_reject_script': self._handle_reject_script,
'btn_project_save': self._cb_project_save,
'btn_disc_create': self._cb_disc_create,
'btn_mma_plan_epic': self._cb_plan_epic,
'btn_mma_accept_tracks': self._cb_accept_tracks,
'btn_mma_start_track': self._cb_start_track,
'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type),
'btn_approve_tool': self._handle_approve_ask,
'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True),
'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True),
'btn_prune_logs': self.cb_prune_logs,
'btn_reset_base_prompt': self._cb_reset_base_prompt,
'btn_show_base_prompt_diff': self._cb_show_base_prompt_diff,
'btn_rebuild_rag_index': self._rebuild_rag_index,
'btn_clear_summary_cache': self._handle_clear_summary_cache,
'btn_rebuild_rag_index': self._rebuild_rag_index,
'btn_clear_summary_cache': self._handle_clear_summary_cache,
}
self._drag_actions: dict[str, Callable[..., Any]] = {}
self._right_clickable_actions: dict[str, Callable[..., Any]] = {}
@@ -1571,48 +1571,6 @@ class AppController:
else:
ai_client._gemini_cli_adapter.binary_path = str(path)
def _set_rag_status(self, status: str) -> None:
"""Thread-safe update of rag_status via the GUI task queue."""
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "set_value",
"item": "rag_status",
"value": status
})
def _rebuild_rag_index(self) -> None:
"""Background thread that re-indexes all files in the current project."""
if not self.rag_config or not self.rag_config.enabled or not self.rag_engine:
return
def _run():
try:
self._set_rag_status("indexing...")
import concurrent.futures
# 1. Incremental indexing of current files in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = []
def do_index(p):
if self.rag_engine: self.rag_engine.index_file(p)
for f in self.files:
path = f.path if hasattr(f, "path") else str(f)
futures.append(executor.submit(do_index, path))
concurrent.futures.wait(futures)
# 2. Cleanup stale entries (files no longer tracked)
indexed_paths = self.rag_engine.get_all_indexed_paths()
current_paths = {f.path if hasattr(f, "path") else str(f) for f in self.files}
stale_paths = [p for p in indexed_paths if p not in current_paths]
if stale_paths:
self.rag_engine.delete_documents_by_path(stale_paths)
self._set_rag_status("ready")
except Exception as e:
self._set_rag_status(f"error: {e}")
self.submit_io(_run)
def _trigger_gui_refresh(self):
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({'action': 'set_comms_dirty'})
@@ -2343,10 +2301,8 @@ class AppController:
def shutdown(self) -> None:
"""
Stops background threads and cleans up resources.
[C: src/gui_2.py:App.run, src/gui_2.py:App.shutdown, tests/conftest.py:app_instance, tests/conftest.py:mock_app]
Stops background threads and cleans up resources.
[C: src/gui_2.py:App.run, src/gui_2.py:App.shutdown, tests/conftest.py:app_instance, tests/conftest.py:mock_app]
"""
from src import ai_client
ai_client.cleanup()
@@ -2451,10 +2407,8 @@ class AppController:
def _handle_request_event(self, event: events.UserRequestEvent) -> None:
"""
Processes a UserRequestEvent by calling the AI client.
[C: tests/test_live_gui_integration_v2.py:test_user_request_error_handling, tests/test_live_gui_integration_v2.py:test_user_request_integration_flow, tests/test_rag_integration.py:test_rag_integration]
Processes a UserRequestEvent by calling the AI client.
[C: tests/test_live_gui_integration_v2.py:test_user_request_error_handling, tests/test_live_gui_integration_v2.py:test_user_request_integration_flow, tests/test_rag_integration.py:test_rag_integration]
"""
self.ai_status = 'sending...'
@@ -2531,127 +2485,6 @@ class AppController:
except Exception as e:
self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"})
def _offload_entry_payload(self, entry: Dict[str, Any]) -> Dict[str, Any]:
optimized = copy.deepcopy(entry)
kind = optimized.get("kind")
payload = optimized.get("payload", {})
if kind == "tool_result" and "output" in payload:
output = payload["output"]
ref_path = session_logger.log_tool_output(output)
if ref_path:
filename = Path(ref_path).name
payload["output"] = f"[REF:{filename}]"
if kind == "tool_call" and "script" in payload:
script = payload["script"]
ref_path = session_logger.log_tool_call(script, "LOG_ONLY", None)
if ref_path:
filename = Path(ref_path).name
payload["script"] = f"[REF:{filename}]"
return optimized
def _on_ai_stream(self, text: str) -> None:
"""Handles streaming text from the AI."""
self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"})
def _on_comms_entry(self, entry: Dict[str, Any]) -> None:
"""
[C: tests/test_app_controller_offloading.py:test_on_comms_entry_tool_result_offloading]
"""
optimized_entry = self._offload_entry_payload(entry)
session_logger.log_comms(optimized_entry)
entry["local_ts"] = time.time()
kind = entry.get("kind")
payload = entry.get("payload", {})
if kind == "response" and "usage" in payload:
u = payload["usage"]
inp = u.get("input_tokens") or u.get("prompt_tokens") or 0
out = u.get("output_tokens") or u.get("completion_tokens") or 0
cache_read = u.get("cache_read_input_tokens") or 0
cache_create = u.get("cache_creation_input_tokens") or 0
total = u.get("total_tokens") or 0
# Store normalized usage back in payload for history rendering
u["input_tokens"] = inp
u["output_tokens"] = out
u["cache_read_input_tokens"] = cache_read
self.session_usage["input_tokens"] += inp
self.session_usage["output_tokens"] += out
self.session_usage["cache_read_input_tokens"] += cache_read
self.session_usage["cache_creation_input_tokens"] += cache_create
self.session_usage["total_tokens"] += total
input_t = u.get("input_tokens") or 0
output_t = u.get("output_tokens") or 0
model = payload.get("model", "unknown")
self._token_history.append({
"time": time.time(),
"input": input_t,
"output": output_t,
"model": model
})
if kind == "request":
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": payload.get("message", ""),
"collapsed": payload.get("collapsed", False),
"ts": entry.get("ts", project_manager.now_ts())
})
if kind == "response":
if self.ui_auto_add_history:
role = payload.get("role", "AI")
text_content = payload.get("text", "")
if text_content.strip():
segments, parsed_response = thinking_parser.parse_thinking_trace(text_content)
entry_obj = {
"role": role,
"content": parsed_response.strip() if parsed_response else "",
"collapsed": True,
"ts": entry.get("ts", project_manager.now_ts())
}
if "usage" in payload:
entry_obj["usage"] = payload["usage"]
if segments:
entry_obj["thinking_segments"] = [{"content": s.content, "marker": s.marker} for s in segments]
if entry_obj["content"] or segments:
with self._pending_history_adds_lock:
self._pending_history_adds.append(entry_obj)
if kind in ("tool_result", "tool_call"):
if self.ui_auto_add_history:
role = "Tool" if kind == "tool_result" else "Vendor API"
content = ""
if kind == "tool_result":
content = payload.get("output", "")
else:
content = payload.get("script") or payload.get("args") or payload.get("message", "")
if isinstance(content, dict):
content = json.dumps(content, indent=1)
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": f"[{kind.upper().replace('_', ' ')}]\n{content}",
"collapsed": True,
"ts": entry.get("ts", project_manager.now_ts())
})
if kind == "history_add":
payload = entry.get("payload", {})
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": payload.get("role", "AI"),
"content": payload.get("content", ""),
"collapsed": payload.get("collapsed", False),
"ts": entry.get("ts", project_manager.now_ts())
})
return
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _on_tool_log(self, script: str, result: str) -> None:
"""
[C: tests/test_app_controller_offloading.py:test_on_tool_log_offloading]
@@ -2662,125 +2495,6 @@ class AppController:
with self._pending_tool_calls_lock:
self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
def _on_api_event(self, event_name: str = "generic_event", **kwargs: Any) -> None:
"""
[C: tests/test_gui_updates.py:test_gui_updates_on_event]
"""
payload = kwargs.get("payload", {})
# Push to background event queue, NOT GUI queue
self.event_queue.put("refresh_api_metrics", payload)
if self.test_hooks_enabled:
with self._api_event_queue_lock:
self._api_event_queue.append({"type": event_name, "payload": payload})
def _on_performance_alert(self, message: str) -> None:
self.diagnostic_log.append({
"ts": project_manager.now_ts(),
"message": message,
"type": "performance"
})
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]:
"""
[C: tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_mutating_tool_triggers_callback, tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_rejection_prevents_dispatch]
"""
if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False):
self.ai_status = "running powershell..."
output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self.ai_status = "powershell done, awaiting AI..."
return output
dialog = ConfirmDialog(script, base_dir)
is_headless = "--headless" in sys.argv
if is_headless:
with self._pending_dialog_lock:
self._pending_actions[dialog._uid] = dialog
else:
with self._pending_dialog_lock:
self._pending_dialog = dialog
if self.test_hooks_enabled and hasattr(self, '_api_event_queue'):
with self._api_event_queue_lock:
self._api_event_queue.append({
"type": "script_confirmation_required",
"action_id": dialog._uid,
"script": str(script),
"base_dir": str(base_dir),
"ts": time.time()
})
approved, final_script = dialog.wait()
if is_headless:
with self._pending_dialog_lock:
if dialog._uid in self._pending_actions:
del self._pending_actions[dialog._uid]
if not approved:
self._append_tool_log(final_script, "REJECTED by user")
return None
self.ai_status = "running powershell..."
output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self._append_tool_log(final_script, output)
self.ai_status = "powershell done, awaiting AI..."
return output
def _append_tool_log(self, script: str, result: str, source_tier: str | None = None, elapsed_ms: float = 0.0) -> None:
"""
[C: tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_has_source_tier, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_keys, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_stores_dict]
"""
self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
tool_name = self._extract_tool_name(script)
is_failure = "REJECTED" in result or "Error" in result or "error" in result.lower()
if tool_name:
if tool_name not in self._tool_stats:
self._tool_stats[tool_name] = {"count": 0, "total_time_ms": 0.0, "failures": 0}
self._tool_stats[tool_name]["count"] += 1
self._tool_stats[tool_name]["total_time_ms"] += elapsed_ms
if is_failure:
self._tool_stats[tool_name]["failures"] += 1
self.ui_last_script_text = script
self.ui_last_script_output = result
self._trigger_script_blink = True
self.show_script_output = True
if self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
def _extract_tool_name(self, script: str) -> str:
if not script:
return "unknown"
script_lower = script.lower()
if "powershell" in script_lower or "run_powershell" in script_lower:
return "run_powershell"
if "read_file" in script_lower:
return "read_file"
if "write_file" in script_lower or "write" in script_lower:
return "write_file"
if "list_directory" in script_lower or "ls" in script_lower:
return "list_directory"
if "search_files" in script_lower or "glob" in script_lower:
return "search_files"
if "web_search" in script_lower:
return "web_search"
if "fetch_url" in script_lower:
return "fetch_url"
if "py_get" in script_lower:
return "py_get_skeleton"
return "other"
def resolve_pending_action(self, action_id: str, approved: bool) -> bool:
with self._pending_dialog_lock:
if action_id in self._pending_actions:
dialog = self._pending_actions[action_id]
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
elif self._pending_dialog and self._pending_dialog._uid == action_id:
dialog = self._pending_dialog
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
return False
@property
def _pending_mma_spawn(self) -> Optional[Dict[str, Any]]:
return self._pending_mma_spawns[0] if self._pending_mma_spawns else None
@@ -2913,21 +2627,6 @@ class AppController:
return _api_token_stats(self)
return api
def _cb_new_project_automated(self, user_data: Any) -> None:
if user_data:
name = Path(user_data).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, user_data)
if user_data not in self.project_paths:
self.project_paths.append(user_data)
self._switch_project(user_data)
def _cb_project_save(self) -> None:
self._flush_to_project()
self._flush_to_config()
models.save_config(self.config)
self.ai_status = "config saved"
def _cb_reset_base_prompt(self, user_data=None) -> None:
"""
[C: src/gui_2.py:App._render_system_prompts_panel]
@@ -2935,14 +2634,6 @@ class AppController:
self.ui_base_system_prompt = ai_client._SYSTEM_PROMPT
self.ui_use_default_base_prompt = False
def _cb_clear_summary_cache(self, user_data=None) -> None:
"""
[C: src/gui_2.py:App._render_files_panel]
"""
from src import summarize
summarize._summary_cache.clear()
self._push_mma_state_update()
def _handle_clear_summary_cache(self, user_data: Any = None) -> None:
self.summary_cache.clear()
self.ai_status = 'summary cache cleared'
@@ -2953,100 +2644,6 @@ class AppController:
"""
self._show_base_prompt_diff_modal = True
def _cb_disc_create(self) -> None:
nm = self.ui_disc_new_name_input.strip()
if nm:
self._create_discussion(nm)
self.ui_disc_new_name_input = ""
def _configure_mcp_for_project(self) -> None:
if not self.active_project_path:
return
project_root = Path(self.active_project_path).parent
file_items_as_dicts = [{"path": f.path if hasattr(f, "path") else str(f)} for f in self.files]
mcp_client.configure(file_items_as_dicts, [str(project_root)])
def _switch_project(self, path: str) -> None:
"""
[C: src/gui_2.py:App._render_projects_panel]
Non-blocking: returns immediately, marks the controller as stale,
and runs the actual save/load work in a background thread so the
render loop keeps drawing and lightweight UI interactions (scrolling,
selecting tabs) remain responsive.
"""
if path == self.active_project_path and not self.is_project_stale():
return
if not Path(path).exists():
self.ai_status = f"project file not found: {path}"
return
with self._project_switch_lock:
if self._project_switch_in_progress:
if self._project_switch_pending_path == path:
return
self._project_switch_pending_path = path
self.ai_status = f"switch queued: {Path(path).stem} (waiting on {Path(self._project_switch_pending_path or '').stem})"
return
self._project_switch_in_progress = True
self._project_switch_pending_path = path
self.ai_status = f"switching to: {Path(path).stem} (stale ui - ops disabled)"
self.submit_io(self._do_project_switch, path)
def _do_project_switch(self, path: str) -> None:
try:
self._flush_to_project()
try:
new_project = project_manager.load_project(path)
except Exception as e:
self.ai_status = f"failed to load project: {e}"
return
try:
self.project = new_project
self.active_project_path = path
new_root = Path(path).parent
self.preset_manager = presets.PresetManager(new_root)
self.tool_preset_manager = tool_presets.ToolPresetManager(new_root)
from src.personas import PersonaManager
self.persona_manager = PersonaManager(new_root)
except Exception as e:
self.ai_status = f"failed to init managers: {e}"
return
self._refresh_from_project()
file_items_as_dicts = [{"path": f.path if hasattr(f, "path") else str(f)} for f in self.files]
mcp_client.configure(file_items_as_dicts, [str(new_root)])
self.ai_status = f"switched to: {Path(path).stem}"
finally:
with self._project_switch_lock:
pending = self._project_switch_pending_path
self._project_switch_in_progress = False
self._project_switch_pending_path = None
if pending and pending != self.active_project_path and Path(pending).exists():
self._switch_project(pending)
def save_context_preset(self, preset: models.ContextPreset) -> None:
self.context_preset_manager.save_preset(self.project, preset)
self._save_active_project()
def load_context_preset(self, name: str) -> models.ContextPreset:
presets = self.context_preset_manager.load_all(self.project)
if name not in presets:
raise KeyError(f"Context preset '{name}' not found.")
preset = presets[name]
# Update only temporary context state, not project files
import copy
self.context_files = []
for f in preset.files:
fi = models.FileItem(path=f.path, view_mode=f.view_mode)
fi.custom_slices = copy.deepcopy(f.custom_slices) if hasattr(f, 'custom_slices') else []
fi.ast_mask = copy.deepcopy(f.ast_mask) if hasattr(f, 'ast_mask') else {}
fi.ast_signatures = getattr(f, 'ast_signatures', False)
fi.ast_definitions = getattr(f, 'ast_definitions', False)
self.context_files.append(fi)
self.screenshots = list(preset.screenshots)
return preset
def clear_last_error(self) -> None:
"""Reset last_error after a successful response cycle.
[C: src/vendor_state.py:get_vendor_state]
@@ -3173,6 +2770,17 @@ class AppController:
#endregion: Serialization
#region: Diagnostics
def _on_performance_alert(self, message: str) -> None:
self.diagnostic_log.append({
"ts": project_manager.now_ts(),
"message": message,
"type": "performance"
})
#endregion: Diagnostics
#region: Usage Analytics
def get_session_insights(self) -> Dict[str, Any]:
@@ -3250,6 +2858,38 @@ class AppController:
#region: Context
def _cb_clear_summary_cache(self, user_data=None) -> None:
"""
[C: src/gui_2.py:App._render_files_panel]
"""
from src import summarize
summarize._summary_cache.clear()
self._push_mma_state_update()
def save_context_preset(self, preset: models.ContextPreset) -> None:
self.context_preset_manager.save_preset(self.project, preset)
self._save_active_project()
def load_context_preset(self, name: str) -> models.ContextPreset:
presets = self.context_preset_manager.load_all(self.project)
if name not in presets:
raise KeyError(f"Context preset '{name}' not found.")
preset = presets[name]
# Update only temporary context state, not project files
import copy
self.context_files = []
for f in preset.files:
fi = models.FileItem(path=f.path, view_mode=f.view_mode)
fi.custom_slices = copy.deepcopy(f.custom_slices) if hasattr(f, 'custom_slices') else []
fi.ast_mask = copy.deepcopy(f.ast_mask) if hasattr(f, 'ast_mask') else {}
fi.ast_signatures = getattr(f, 'ast_signatures', False)
fi.ast_definitions = getattr(f, 'ast_definitions', False)
self.context_files.append(fi)
self.screenshots = list(preset.screenshots)
return preset
def _update_cached_stats(self) -> None:
from src import ai_client
self._cached_cache_stats = ai_client.get_gemini_cache_stats()
@@ -3265,7 +2905,132 @@ class AppController:
#endregion: Context
#region: Project
#region: RAG
def _set_rag_status(self, status: str) -> None:
"""Thread-safe update of rag_status via the GUI task queue."""
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "set_value",
"item": "rag_status",
"value": status
})
def _rebuild_rag_index(self) -> None:
"""Background thread that re-indexes all files in the current project."""
if not self.rag_config or not self.rag_config.enabled or not self.rag_engine:
return
def _run():
try:
self._set_rag_status("indexing...")
import concurrent.futures
# 1. Incremental indexing of current files in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = []
def do_index(p):
if self.rag_engine: self.rag_engine.index_file(p)
for f in self.files:
path = f.path if hasattr(f, "path") else str(f)
futures.append(executor.submit(do_index, path))
concurrent.futures.wait(futures)
# 2. Cleanup stale entries (files no longer tracked)
indexed_paths = self.rag_engine.get_all_indexed_paths()
current_paths = {f.path if hasattr(f, "path") else str(f) for f in self.files}
stale_paths = [p for p in indexed_paths if p not in current_paths]
if stale_paths:
self.rag_engine.delete_documents_by_path(stale_paths)
self._set_rag_status("ready")
except Exception as e:
self._set_rag_status(f"error: {e}")
self.submit_io(_run)
#endregion: RAG
#region: Project
def _configure_mcp_for_project(self) -> None:
if not self.active_project_path:
return
project_root = Path(self.active_project_path).parent
file_items_as_dicts = [{"path": f.path if hasattr(f, "path") else str(f)} for f in self.files]
mcp_client.configure(file_items_as_dicts, [str(project_root)])
def _cb_new_project_automated(self, user_data: Any) -> None:
if user_data:
name = Path(user_data).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, user_data)
if user_data not in self.project_paths:
self.project_paths.append(user_data)
self._switch_project(user_data)
def _cb_project_save(self) -> None:
self._flush_to_project()
self._flush_to_config()
models.save_config(self.config)
self.ai_status = "config saved"
def _do_project_switch(self, path: str) -> None:
try:
self._flush_to_project()
try:
new_project = project_manager.load_project(path)
except Exception as e:
self.ai_status = f"failed to load project: {e}"
return
try:
self.project = new_project
self.active_project_path = path
new_root = Path(path).parent
self.preset_manager = presets.PresetManager(new_root)
self.tool_preset_manager = tool_presets.ToolPresetManager(new_root)
from src.personas import PersonaManager
self.persona_manager = PersonaManager(new_root)
except Exception as e:
self.ai_status = f"failed to init managers: {e}"
return
self._refresh_from_project()
file_items_as_dicts = [{"path": f.path if hasattr(f, "path") else str(f)} for f in self.files]
mcp_client.configure(file_items_as_dicts, [str(new_root)])
self.ai_status = f"switched to: {Path(path).stem}"
finally:
with self._project_switch_lock:
pending = self._project_switch_pending_path
self._project_switch_in_progress = False
self._project_switch_pending_path = None
if pending and pending != self.active_project_path and Path(pending).exists():
self._switch_project(pending)
def _switch_project(self, path: str) -> None:
"""
[C: src/gui_2.py:App._render_projects_panel]
Non-blocking: returns immediately, marks the controller as stale,
and runs the actual save/load work in a background thread so the
render loop keeps drawing and lightweight UI interactions (scrolling,
selecting tabs) remain responsive.
"""
if path == self.active_project_path and not self.is_project_stale():
return
if not Path(path).exists():
self.ai_status = f"project file not found: {path}"
return
with self._project_switch_lock:
if self._project_switch_in_progress:
if self._project_switch_pending_path == path:
return
self._project_switch_pending_path = path
self.ai_status = f"switch queued: {Path(path).stem} (waiting on {Path(self._project_switch_pending_path or '').stem})"
return
self._project_switch_in_progress = True
self._project_switch_pending_path = path
self.ai_status = f"switching to: {Path(path).stem} (stale ui - ops disabled)"
self.submit_io(self._do_project_switch, path)
def _refresh_from_project(self) -> None:
# Deserialize FileItems in files.paths
@@ -3394,9 +3159,9 @@ class AppController:
except Exception as e:
self.ai_status = f"save error: {e}"
#endregion: Project
#endregion: Project
#region: AI Settings
#region: AI Settings
def _apply_preset(self, name: str, scope: str) -> None:
"""
@@ -3513,10 +3278,16 @@ class AppController:
self.view_presets = [vp for vp in self.view_presets if vp.name != name]
self._flush_to_project()
#endregion: AI Settings
#endregion: AI Settings
#region: Discusssion
def _cb_disc_create(self) -> None:
nm = self.ui_disc_new_name_input.strip()
if nm:
self._create_discussion(nm)
self.ui_disc_new_name_input = ""
def _get_discussion_names(self) -> list[str]:
"""
[C: src/gui_2.py:App._render_discussion_selector, src/gui_2.py:App._render_theme_panel]
@@ -3864,6 +3635,243 @@ class AppController:
#endregion: Discusssion
#region: Operations
def _offload_entry_payload(self, entry: Dict[str, Any]) -> Dict[str, Any]:
optimized = copy.deepcopy(entry)
kind = optimized.get("kind")
payload = optimized.get("payload", {})
if kind == "tool_result" and "output" in payload:
output = payload["output"]
ref_path = session_logger.log_tool_output(output)
if ref_path:
filename = Path(ref_path).name
payload["output"] = f"[REF:{filename}]"
if kind == "tool_call" and "script" in payload:
script = payload["script"]
ref_path = session_logger.log_tool_call(script, "LOG_ONLY", None)
if ref_path:
filename = Path(ref_path).name
payload["script"] = f"[REF:{filename}]"
return optimized
def _on_api_event(self, event_name: str = "generic_event", **kwargs: Any) -> None:
"""
[C: tests/test_gui_updates.py:test_gui_updates_on_event]
"""
payload = kwargs.get("payload", {})
# Push to background event queue, NOT GUI queue
self.event_queue.put("refresh_api_metrics", payload)
if self.test_hooks_enabled:
with self._api_event_queue_lock:
self._api_event_queue.append({"type": event_name, "payload": payload})
def _on_ai_stream(self, text: str) -> None:
"""Handles streaming text from the AI."""
self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"})
def _on_comms_entry(self, entry: Dict[str, Any]) -> None:
"""
[C: tests/test_app_controller_offloading.py:test_on_comms_entry_tool_result_offloading]
"""
optimized_entry = self._offload_entry_payload(entry)
session_logger.log_comms(optimized_entry)
entry["local_ts"] = time.time()
kind = entry.get("kind")
payload = entry.get("payload", {})
if kind == "response" and "usage" in payload:
u = payload["usage"]
inp = u.get("input_tokens") or u.get("prompt_tokens") or 0
out = u.get("output_tokens") or u.get("completion_tokens") or 0
cache_read = u.get("cache_read_input_tokens") or 0
cache_create = u.get("cache_creation_input_tokens") or 0
total = u.get("total_tokens") or 0
# Store normalized usage back in payload for history rendering
u["input_tokens"] = inp
u["output_tokens"] = out
u["cache_read_input_tokens"] = cache_read
self.session_usage["input_tokens"] += inp
self.session_usage["output_tokens"] += out
self.session_usage["cache_read_input_tokens"] += cache_read
self.session_usage["cache_creation_input_tokens"] += cache_create
self.session_usage["total_tokens"] += total
input_t = u.get("input_tokens") or 0
output_t = u.get("output_tokens") or 0
model = payload.get("model", "unknown")
self._token_history.append({
"time": time.time(),
"input": input_t,
"output": output_t,
"model": model
})
if kind == "request":
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": payload.get("message", ""),
"collapsed": payload.get("collapsed", False),
"ts": entry.get("ts", project_manager.now_ts())
})
if kind == "response":
if self.ui_auto_add_history:
role = payload.get("role", "AI")
text_content = payload.get("text", "")
if text_content.strip():
segments, parsed_response = thinking_parser.parse_thinking_trace(text_content)
entry_obj = {
"role": role,
"content": parsed_response.strip() if parsed_response else "",
"collapsed": True,
"ts": entry.get("ts", project_manager.now_ts())
}
if "usage" in payload:
entry_obj["usage"] = payload["usage"]
if segments:
entry_obj["thinking_segments"] = [{"content": s.content, "marker": s.marker} for s in segments]
if entry_obj["content"] or segments:
with self._pending_history_adds_lock:
self._pending_history_adds.append(entry_obj)
if kind in ("tool_result", "tool_call"):
if self.ui_auto_add_history:
role = "Tool" if kind == "tool_result" else "Vendor API"
content = ""
if kind == "tool_result":
content = payload.get("output", "")
else:
content = payload.get("script") or payload.get("args") or payload.get("message", "")
if isinstance(content, dict):
content = json.dumps(content, indent=1)
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": f"[{kind.upper().replace('_', ' ')}]\n{content}",
"collapsed": True,
"ts": entry.get("ts", project_manager.now_ts())
})
if kind == "history_add":
payload = entry.get("payload", {})
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": payload.get("role", "AI"),
"content": payload.get("content", ""),
"collapsed": payload.get("collapsed", False),
"ts": entry.get("ts", project_manager.now_ts())
})
return
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _append_tool_log(self, script: str, result: str, source_tier: str | None = None, elapsed_ms: float = 0.0) -> None:
"""
[C: tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_has_source_tier, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_keys, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_stores_dict]
"""
self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
tool_name = self._extract_tool_name(script)
is_failure = "REJECTED" in result or "Error" in result or "error" in result.lower()
if tool_name:
if tool_name not in self._tool_stats:
self._tool_stats[tool_name] = {"count": 0, "total_time_ms": 0.0, "failures": 0}
self._tool_stats[tool_name]["count"] += 1
self._tool_stats[tool_name]["total_time_ms"] += elapsed_ms
if is_failure:
self._tool_stats[tool_name]["failures"] += 1
self.ui_last_script_text = script
self.ui_last_script_output = result
self._trigger_script_blink = True
self.show_script_output = True
if self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]:
"""
[C: tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_mutating_tool_triggers_callback, tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_rejection_prevents_dispatch]
"""
if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False):
self.ai_status = "running powershell..."
output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self.ai_status = "powershell done, awaiting AI..."
return output
dialog = ConfirmDialog(script, base_dir)
is_headless = "--headless" in sys.argv
if is_headless:
with self._pending_dialog_lock:
self._pending_actions[dialog._uid] = dialog
else:
with self._pending_dialog_lock:
self._pending_dialog = dialog
if self.test_hooks_enabled and hasattr(self, '_api_event_queue'):
with self._api_event_queue_lock:
self._api_event_queue.append({
"type": "script_confirmation_required",
"action_id": dialog._uid,
"script": str(script),
"base_dir": str(base_dir),
"ts": time.time()
})
approved, final_script = dialog.wait()
if is_headless:
with self._pending_dialog_lock:
if dialog._uid in self._pending_actions:
del self._pending_actions[dialog._uid]
if not approved:
self._append_tool_log(final_script, "REJECTED by user")
return None
self.ai_status = "running powershell..."
output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback)
self._append_tool_log(final_script, output)
self.ai_status = "powershell done, awaiting AI..."
return output
def resolve_pending_action(self, action_id: str, approved: bool) -> bool:
with self._pending_dialog_lock:
if action_id in self._pending_actions:
dialog = self._pending_actions[action_id]
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
elif self._pending_dialog and self._pending_dialog._uid == action_id:
dialog = self._pending_dialog
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
return False
def _extract_tool_name(self, script: str) -> str:
if not script:
return "unknown"
script_lower = script.lower()
if "powershell" in script_lower or "run_powershell" in script_lower:
return "run_powershell"
if "read_file" in script_lower:
return "read_file"
if "write_file" in script_lower or "write" in script_lower:
return "write_file"
if "list_directory" in script_lower or "ls" in script_lower:
return "list_directory"
if "search_files" in script_lower or "glob" in script_lower:
return "search_files"
if "web_search" in script_lower:
return "web_search"
if "fetch_url" in script_lower:
return "fetch_url"
if "py_get" in script_lower:
return "py_get_skeleton"
return "other"
#endregion: Operations
#region MMA (Controller)
def _cb_plan_epic(self) -> None: