import copy import inspect import json import os import re import requests import sys import threading import time import tomli_w import traceback import uuid from dataclasses import asdict from datetime import datetime from fastapi import FastAPI, Depends, HTTPException from fastapi.security.api_key import APIKeyHeader from pathlib import Path from typing import Any, List, Dict, Optional, Callable from src import aggregate from src import models from src.models import GenerateRequest, ConfirmRequest from src import ai_client from src import conductor_tech_lead from src import events from src import mcp_client from src import multi_agent_conductor from src import orchestrator_pm from src import paths from src import performance_monitor from src import project_manager from src import session_logger from src import workspace_manager from src import presets from src import shell_runner from src import theme_2 as theme from src import thinking_parser from src import tool_presets from src.context_presets import ContextPresetManager from src.file_cache import ASTParser def parse_symbols(text: str) -> list[str]: """ Finds all occurrences of '@SymbolName' in text and returns SymbolName. SymbolName can be a function, class, or method (e.g. @MyClass, @my_func, @MyClass.my_method). [C: tests/test_symbol_lookup.py:TestSymbolLookup.test_parse_symbols_basic, tests/test_symbol_lookup.py:TestSymbolLookup.test_parse_symbols_edge_cases, tests/test_symbol_lookup.py:TestSymbolLookup.test_parse_symbols_methods, tests/test_symbol_lookup.py:TestSymbolLookup.test_parse_symbols_mixed, tests/test_symbol_lookup.py:TestSymbolLookup.test_parse_symbols_no_symbols] """ return re.findall(r"@([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)*)", text) def get_symbol_definition(symbol: str, files: list[str]) -> tuple[str, str, int] | None: """ [C: tests/test_symbol_lookup.py:TestSymbolLookup.test_get_symbol_definition_found, tests/test_symbol_lookup.py:TestSymbolLookup.test_get_symbol_definition_not_found] """ for file_path in files: result = mcp_client.py_get_symbol_info(file_path, symbol) if isinstance(result, tuple): source, line = result return (file_path, source, line) return None class ConfirmDialog: def __init__(self, script: str, base_dir: str) -> None: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self._uid = str(uuid.uuid4()) self._script = str(script) if script is not None else "" self._base_dir = str(base_dir) if base_dir is not None else "" self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: """ [C: src/mcp_client.py:StdioMCPServer.stop, src/multi_agent_conductor.py:confirm_execution, src/multi_agent_conductor.py:confirm_spawn, tests/conftest.py:live_gui, tests/test_ai_client_concurrency.py:run_t1, tests/test_ai_client_concurrency.py:run_t2, tests/test_ai_server.py:test_server_handles_list_models, tests/test_ai_server.py:test_server_handles_unknown_method, tests/test_ai_server.py:test_server_loads_google_genai_quickly, tests/test_ai_server.py:test_server_outputs_ready_marker, tests/test_ai_server.py:test_server_starts_and_exits_cleanly, tests/test_conductor_engine_abort.py:worker, tests/test_parallel_execution.py:test_worker_pool_limit] """ start_time = time.time() with self._condition: while not self._done: if time.time() - start_time > 120: return False, self._script self._condition.wait(timeout=0.1) return self._approved, self._script class MMAApprovalDialog: def __init__(self, ticket_id: str, payload: str) -> None: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self._payload = payload self._condition = threading.Condition() self._done = False self._approved = False def wait(self) -> tuple[bool, str]: """ [C: src/mcp_client.py:StdioMCPServer.stop, src/multi_agent_conductor.py:confirm_execution, src/multi_agent_conductor.py:confirm_spawn, tests/conftest.py:live_gui, tests/test_ai_client_concurrency.py:run_t1, tests/test_ai_client_concurrency.py:run_t2, tests/test_ai_server.py:test_server_handles_list_models, tests/test_ai_server.py:test_server_handles_unknown_method, tests/test_ai_server.py:test_server_loads_google_genai_quickly, tests/test_ai_server.py:test_server_outputs_ready_marker, tests/test_ai_server.py:test_server_starts_and_exits_cleanly, tests/test_conductor_engine_abort.py:worker, tests/test_parallel_execution.py:test_worker_pool_limit] """ start_time = time.time() with self._condition: while not self._done: if time.time() - start_time > 120: return False, self._payload self._condition.wait(timeout=0.1) return self._approved, self._payload class MMASpawnApprovalDialog: def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self._prompt = prompt self._context_md = context_md self._condition = threading.Condition() self._done = False self._approved = False self._abort = False def wait(self) -> dict[str, Any]: """ [C: src/mcp_client.py:StdioMCPServer.stop, src/multi_agent_conductor.py:confirm_execution, src/multi_agent_conductor.py:confirm_spawn, tests/conftest.py:live_gui, tests/test_ai_client_concurrency.py:run_t1, tests/test_ai_client_concurrency.py:run_t2, tests/test_ai_server.py:test_server_handles_list_models, tests/test_ai_server.py:test_server_handles_unknown_method, tests/test_ai_server.py:test_server_loads_google_genai_quickly, tests/test_ai_server.py:test_server_outputs_ready_marker, tests/test_ai_server.py:test_server_starts_and_exits_cleanly, tests/test_conductor_engine_abort.py:worker, tests/test_parallel_execution.py:test_worker_pool_limit] """ start_time = time.time() with self._condition: while not self._done: if time.time() - start_time > 120: return {'approved': False, 'abort': True, 'prompt': self._prompt, 'context_md': self._context_md} self._condition.wait(timeout=0.1) return { 'approved': self._approved, 'abort': self._abort, 'prompt': self._prompt, 'context_md': self._context_md } #region: API Handlers async def _api_get_key(controller: 'AppController', header_key: str) -> str: """ Validates the API key from the request header against configuration. [SDM: src/app_controller.py:_api_get_key] """ headless_cfg = controller.config.get("headless", {}) config_key = headless_cfg.get("api_key", "").strip() env_key = os.environ.get("SLOP_API_KEY", "").strip() target_key = env_key or config_key if not target_key: raise HTTPException(status_code=403, detail="API Key not configured on server") if header_key == target_key: return header_key raise HTTPException(status_code=403, detail="Could not validate API Key") def _api_health(controller: 'AppController') -> dict[str, str]: """ Returns the health status of the API. [SDM: src/app_controller.py:_api_health] """ return {"status": "ok"} def _api_get_gui_state(controller: 'AppController') -> dict[str, Any]: """ Returns the current GUI state for specific fields. [SDM: src/app_controller.py:_api_get_gui_state] """ gettable = getattr(controller, "_gettable_fields", {}) state = {} import dataclasses for key, attr in gettable.items(): val = getattr(controller, attr, None) if dataclasses.is_dataclass(val): state[key] = dataclasses.asdict(val) else: state[key] = val return state def _api_get_mma_status(controller: 'AppController') -> dict[str, Any]: """ Dedicated endpoint for MMA-related status. [SDM: src/app_controller.py:_api_get_mma_status] """ return { "mma_status": controller.mma_status, "ai_status": controller.ai_status, "mma_streams": controller.mma_streams, "worker_status": controller._worker_status, "tool_stats": controller._tool_stats, "active_tier": controller.active_tier, "active_tickets": controller.active_tickets, "proposed_tracks": controller.proposed_tracks, "tracks": controller.tracks, "tier_usage": controller.mma_tier_usage } def _api_post_gui(controller: 'AppController', req: dict) -> dict[str, str]: """ Pushes a GUI task to the event queue. [SDM: src/app_controller.py:_api_post_gui] """ controller.event_queue.put("gui_task", req) return {"status": "queued"} def _api_get_api_session(controller: 'AppController') -> dict[str, Any]: """ Returns current discussion session entries. [SDM: src/app_controller.py:_api_get_api_session] """ with controller._disc_entries_lock: return {"session": {"entries": controller.disc_entries}} def _api_post_api_session(controller: 'AppController', req: dict) -> dict[str, str]: """ Updates session entries. [SDM: src/app_controller.py:_api_post_api_session] """ entries = req.get("entries", []) with controller._disc_entries_lock: controller.disc_entries = entries return {"status": "updated"} def _api_get_api_project(controller: 'AppController') -> dict[str, Any]: """ Returns current project data. [SDM: src/app_controller.py:_api_get_api_project] """ return {"project": controller.project} def _api_get_performance(controller: 'AppController') -> dict[str, Any]: """ Returns performance monitor metrics. [SDM: src/app_controller.py:_api_get_performance] """ return {"performance": controller.perf_monitor.get_metrics()} def _api_get_diagnostics(controller: 'AppController') -> dict[str, Any]: """ Alias for performance metrics. [SDM: src/app_controller.py:_api_get_diagnostics] """ return controller.perf_monitor.get_metrics() def _api_status(controller: 'AppController') -> dict[str, Any]: """ Returns the current status of the application. [SDM: src/app_controller.py:_api_status] """ return { "provider": controller.current_provider, "model": controller.current_model, "status": controller.ai_status, "usage": controller.session_usage } def _api_generate(controller: 'AppController', req: GenerateRequest) -> dict[str, Any]: """ Triggers an AI generation request using the current project context. [SDM: src/app_controller.py:_api_generate] """ if not req.prompt.strip(): raise HTTPException(status_code=400, detail="Prompt cannot be empty") with controller._send_thread_lock: start_time = time.time() try: # 1. Build context from src import aggregate full_md, path, file_items, stable_md, disc_text = controller._do_generate() controller._last_stable_md = stable_md controller.last_md = full_md controller.last_md_path = path controller.last_file_items = file_items user_msg = req.prompt # 2. RAG Retrieval if controller.rag_engine and controller.rag_config and controller.rag_config.enabled: try: chunks = controller.rag_engine.search(user_msg) if chunks: context_block = "## Retrieved Context\n\n" for i, chunk in enumerate(chunks): path = chunk.get("metadata", {}).get("path", "unknown") context_block += f"### Chunk {i+1} (Source: {path})\n{chunk.get('document', '')}\n\n" user_msg = context_block + user_msg except Exception as e: sys.stderr.write(f"RAG search error: {e}\n") sys.stderr.flush() # 3. Symbol Resolution try: from src.markdown_helper import parse_symbols, get_symbol_definition symbols = parse_symbols(user_msg) file_paths = [f.path if hasattr(f, "path") else f.get("path") if isinstance(f, dict) else str(f) for f in controller.last_file_items] for symbol in symbols: res = get_symbol_definition(symbol, file_paths) if res: file_path, definition, line = res user_msg += f'\n\n[Definition: {symbol} from {file_path} (line {line})]\n```python\n{definition}\n```' except Exception as e: sys.stderr.write(f"Symbol resolution error: {e}\n") sys.stderr.flush() base_dir = controller.active_project_root csp = filter(bool, [controller.ui_global_system_prompt.strip(), controller.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) ai_client.set_base_system_prompt(controller.ui_base_system_prompt) ai_client.set_use_default_base_prompt(controller.ui_use_default_base_prompt) ai_client.set_project_context_marker(controller.ui_project_context_marker) temp = req.temperature if req.temperature is not None else controller.temperature top_p = req.top_p if req.top_p is not None else controller.top_p tokens = req.max_tokens if req.max_tokens is not None else controller.max_tokens ai_client.set_model_params(temp, tokens, controller.history_trunc_limit, top_p) ai_client.set_agent_tools(controller.ui_agent_tools) if req.auto_add_history: with controller._pending_history_adds_lock: controller._pending_history_adds.append({ "role": "User", "content": user_msg, "collapsed": True, "ts": project_manager.now_ts() }) try: resp = ai_client.send(stable_md, user_msg, base_dir, controller.last_file_items, disc_text, rag_engine=None) if req.auto_add_history: with controller._pending_history_adds_lock: controller._pending_history_adds.append({ "role": "AI", "content": resp, "collapsed": True, "ts": project_manager.now_ts() }) controller._recalculate_session_usage() duration = time.time() - start_time return { "text": resp, "metadata": { "provider": controller.current_provider, "model": controller.current_model, "duration_sec": round(duration, 3), "timestamp": project_manager.now_ts() }, "usage": controller.session_usage } except ai_client.ProviderError as e: raise HTTPException(status_code=500, detail=e.ui_message()) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) except Exception as e: import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) except ai_client.ProviderError as e: raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}") except Exception as e: raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}") async def _api_stream(controller: 'AppController', req: GenerateRequest) -> Any: """ Placeholder for streaming AI generation responses (Not yet implemented). [SDM: src/app_controller.py:_api_stream] """ raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.") def _api_pending_actions(controller: 'AppController') -> list[dict[str, Any]]: """ Lists all pending PowerShell scripts awaiting confirmation. [SDM: src/app_controller.py:_api_pending_actions] """ with controller._pending_dialog_lock: return [ {"action_id": uid, "script": diag._script, "base_dir": diag._base_dir} for uid, diag in controller._pending_actions.items() ] def _api_confirm_action(controller: 'AppController', action_id: str, req: ConfirmRequest) -> dict[str, str]: """ Approves or rejects a pending action. [SDM: src/app_controller.py:_api_confirm_action] """ with controller._pending_dialog_lock: if action_id not in controller._pending_actions: raise HTTPException(status_code=404, detail="Action not found") dialog = controller._pending_actions.pop(action_id) if req.script is not None: dialog._script = req.script with dialog._condition: dialog._approved = req.approved dialog._done = True dialog._condition.notify_all() return {"status": "confirmed" if req.approved else "rejected"} def _api_list_sessions(controller: 'AppController') -> list[str]: """ Lists all session IDs. [SDM: src/app_controller.py:_api_list_sessions] """ log_dir = paths.get_logs_dir() if not log_dir.exists(): return [] return [d.name for d in log_dir.iterdir() if d.is_dir()] def _api_get_session(controller: 'AppController', session_id: str) -> dict[str, Any]: """ Returns the content of the comms.log for a specific session. [SDM: src/app_controller.py:_api_get_session] """ log_path = paths.get_logs_dir() / session_id / "comms.log" if not log_path.exists(): raise HTTPException(status_code=404, detail="Session log not found") return {"id": session_id, "content": log_path.read_text(encoding="utf-8", errors="replace")} def _api_delete_session(controller: 'AppController', session_id: str) -> dict[str, str]: """ Deletes a specific session directory. [SDM: src/app_controller.py:_api_delete_session] """ log_path = paths.get_logs_dir() / session_id if not log_path.exists() or not log_path.is_dir(): raise HTTPException(status_code=404, detail="Session directory not found") import shutil shutil.rmtree(log_path) return {"status": "deleted"} def _api_get_context(controller: 'AppController') -> dict[str, Any]: """ Returns the current aggregated project context. [SDM: src/app_controller.py:_api_get_context] """ try: md, path, file_items, stable_md, disc_text = controller._do_generate() screenshots = controller.project.get("screenshots", {}).get("paths", []) return { "files": [f.get("path") if isinstance(f, dict) else str(f) for f in file_items], "screenshots": screenshots, "files_base_dir": controller.active_project_root, "markdown": md, "discussion": disc_text } except Exception as e: raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}") def _api_token_stats(controller: 'AppController') -> dict[str, Any]: """ Returns current token usage and budget statistics. [SDM: src/app_controller.py:_api_token_stats] """ return controller._token_stats #endregion #region: GUI Task Handlers def _handle_ai_response(controller: 'AppController', task: dict): """[SDM: AppController._handle_ai_response]""" payload = task.get("payload", {}) text = payload.get("text", "") stream_id = payload.get("stream_id") is_streaming = payload.get("status") == "streaming..." if stream_id: if is_streaming: if stream_id not in controller.mma_streams: controller.mma_streams[stream_id] = "" if stream_id not in controller._worker_status: controller._worker_status[stream_id] = "running" controller.mma_streams[stream_id] += text if len(controller.mma_streams[stream_id]) > controller.MAX_STREAM_SIZE: controller.mma_streams[stream_id] = controller.mma_streams[stream_id][-controller.MAX_STREAM_SIZE:] else: controller.mma_streams[stream_id] = text if stream_id in controller._worker_status and controller._worker_status[stream_id] == "running": controller._worker_status[stream_id] = "completed" if stream_id == "Tier 1": if "status" in payload: controller._ai_status = payload["status"] else: if is_streaming: controller.ai_response += text else: controller.ai_response = text controller._ai_status = payload.get("status", "done") controller._trigger_blink = True if not stream_id: controller._token_stats_dirty = True if not is_streaming: controller._autofocus_response_tab = True def _handle_mma_state_update(controller: 'AppController', task: dict): """[SDM: AppController._handle_mma_state_update]""" p = task.get("payload") if not isinstance(p, dict): p = task track_data = p.get("track") is_active_track = False if track_data and controller.active_track and track_data.get("id") == controller.active_track.id: is_active_track = True if is_active_track or not controller.active_track: controller._mma_status = p.get("status", controller._mma_status) old_tier = controller.active_tier controller.active_tier = p.get("active_tier", controller.active_tier) if getattr(controller, "ui_auto_switch_layout", False) and controller.active_tier and controller.active_tier != old_tier: for tier_prefix in ["Tier 1", "Tier 2", "Tier 3", "Tier 4"]: if controller.active_tier.startswith(tier_prefix): bound_profile = getattr(controller, "ui_tier_layout_bindings", {}).get(tier_prefix) if bound_profile: controller._cb_load_workspace_profile(bound_profile) break new_usage = p.get("tier_usage", {}) for tier, data in new_usage.items(): if tier in controller.mma_tier_usage: input_val = data.get("input") if input_val is not None: controller.mma_tier_usage[tier]["input"] = input_val output_val = data.get("output") if output_val is not None: controller.mma_tier_usage[tier]["output"] = output_val if "model" in data: controller.mma_tier_usage[tier]["model"] = data["model"] if "provider" in data: controller.mma_tier_usage[tier]["provider"] = data["provider"] else: controller.mma_tier_usage[tier] = data if is_active_track or not controller.active_track: controller.active_tickets = p.get("tickets", []) if track_data: tickets = [] for t_data in controller.active_tickets: if isinstance(t_data, models.Ticket): tickets.append(t_data) else: if "goal" in t_data and "description" not in t_data: t_data["description"] = t_data["goal"] tickets.append(models.Ticket.from_dict(t_data)) controller.active_track = models.Track( id=track_data.get("id"), description=track_data.get("title", ""), tickets=tickets ) def _handle_refresh_api_metrics(controller: 'AppController', task: dict): """[SDM: AppController._handle_refresh_api_metrics]""" controller._refresh_api_metrics(task.get("payload", {}), md_content=controller.last_md or None) def _handle_set_ai_status(controller: 'AppController', task: dict): """[SDM: AppController._handle_set_ai_status]""" controller._ai_status = task.get("value", task.get("payload", "")) def _handle_set_mma_status(controller: 'AppController', task: dict): """[SDM: AppController._handle_set_mma_status]""" controller._mma_status = task.get("value", task.get("payload", "")) def _handle_mma_stream(controller: 'AppController', task: dict): """[SDM: AppController._handle_mma_stream]""" stream_id = task.get("stream_id") or task.get("payload", {}).get("stream_id") text = task.get("text") or task.get("payload", {}).get("text", "") if stream_id: if stream_id not in controller.mma_streams: controller.mma_streams[stream_id] = "" controller.mma_streams[stream_id] += text def _handle_show_track_proposal(controller: 'AppController', task: dict): """[SDM: AppController._handle_show_track_proposal]""" controller.proposed_tracks = task.get("payload", []) controller._show_track_proposal_modal = True def _handle_custom_callback(controller: 'AppController', task: dict): """[SDM: AppController._handle_custom_callback]""" cb = task.get("callback") args = task.get("args", []) if callable(cb): try: cb(*args) except Exception as e: print(f"Error in direct custom callback: {e}") elif cb in controller._predefined_callbacks: controller._predefined_callbacks[cb](*args) def _handle_set_value(controller: 'AppController', task: dict): """[SDM: AppController._handle_set_value]""" item = task.get("item") value = task.get("value") if item in controller._settable_fields: attr_name = controller._settable_fields[item] setattr(controller, attr_name, value) if item == "gcli_path": controller._update_gcli_adapter(str(value)) def _handle_click(controller: 'AppController', task: dict): """[SDM: AppController._handle_click]""" item = task.get("item") user_data = task.get("user_data") if item == "btn_project_new_automated": controller._cb_new_project_automated(user_data) elif item == "btn_mma_load_track": controller._cb_load_track(str(user_data or "")) elif item in controller._clickable_actions: func = controller._clickable_actions[item] try: sig = inspect.signature(func) if 'user_data' in sig.parameters: func(user_data=user_data) else: func() except Exception: func() def _handle_drag(controller: 'AppController', task: dict): """[SDM: AppController._handle_drag]""" src_item = task.get("src_item") dst_item = task.get("dst_item") if src_item in controller._drag_actions: func = controller._drag_actions[src_item] func(dst_item=dst_item) def _handle_right_click(controller: 'AppController', task: dict): """[SDM: AppController._handle_right_click]""" item = task.get("item") if item in controller._right_clickable_actions: func = controller._right_clickable_actions[item] func() def _handle_select_list_item(controller: 'AppController', task: dict): """[SDM: AppController._handle_select_list_item]""" item = task.get("listbox", task.get("item")) value = task.get("item_value", task.get("value")) if item == "disc_listbox": controller._switch_discussion(str(value or "")) def _handle_ask(controller: 'AppController', task: dict): """[SDM: AppController._handle_ask]""" controller._pending_ask_dialog = True controller._ask_request_id = task.get("request_id") controller._ask_tool_data = task.get("data", {}) def _handle_clear_ask(controller: 'AppController', task: dict): """[SDM: AppController._handle_clear_ask]""" if controller._ask_request_id == task.get("request_id"): controller._pending_ask_dialog = False controller._ask_request_id = None controller._ask_tool_data = None def _handle_mma_step_approval(controller: 'AppController', task: dict): """[SDM: AppController._handle_mma_step_approval]""" if controller.test_hooks_enabled and not getattr(controller, "ui_manual_approve", False): if "dialog_container" in task: class AutoStepDialog: def __init__(self, t): self.t = t def wait(self): return True, self.t.get("payload", "") task["dialog_container"][0] = AutoStepDialog(task) return dlg = MMAApprovalDialog(str(task.get("ticket_id") or ""), str(task.get("payload") or "")) controller._pending_mma_approvals.append(task) if "dialog_container" in task: task["dialog_container"][0] = dlg def _handle_mma_spawn_approval(controller: 'AppController', task: dict): """[SDM: AppController._handle_mma_spawn_approval]""" if controller.test_hooks_enabled and not getattr(controller, "ui_manual_approve", False): if "dialog_container" in task: class AutoSpawnDialog: def __init__(self, t): self.t = t def wait(self): """ [C: src/mcp_client.py:StdioMCPServer.stop, src/multi_agent_conductor.py:confirm_execution, src/multi_agent_conductor.py:confirm_spawn, tests/conftest.py:live_gui, tests/test_ai_client_concurrency.py:run_t1, tests/test_ai_client_concurrency.py:run_t2, tests/test_ai_server.py:test_server_handles_list_models, tests/test_ai_server.py:test_server_handles_unknown_method, tests/test_ai_server.py:test_server_loads_google_genai_quickly, tests/test_ai_server.py:test_server_outputs_ready_marker, tests/test_ai_server.py:test_server_starts_and_exits_cleanly, tests/test_conductor_engine_abort.py:worker, tests/test_parallel_execution.py:test_worker_pool_limit] """ return {'approved': True, 'abort': False, 'prompt': self.t.get("prompt"), 'context_md': self.t.get("context_md")} task["dialog_container"][0] = AutoSpawnDialog(task) return spawn_dlg = MMASpawnApprovalDialog( str(task.get("ticket_id") or ""), str(task.get("role") or ""), str(task.get("prompt") or ""), str(task.get("context_md") or "") ) controller._pending_mma_spawns.append(task) controller._mma_spawn_prompt = task.get("prompt", "") controller._mma_spawn_context = task.get("context_md", "") controller._mma_spawn_open = True controller._mma_spawn_edit_mode = False if "dialog_container" in task: task["dialog_container"][0] = spawn_dlg def _handle_ticket_started(controller: 'AppController', task: dict): """[SDM: AppController._handle_ticket_started]""" payload = task.get("payload", {}) ticket_id = payload.get("ticket_id") start_time = payload.get("timestamp") persona_id = payload.get("persona_id") model = payload.get("model") if ticket_id and start_time: controller._ticket_start_times[ticket_id] = start_time if ticket_id and (persona_id or model): stream_id = f"Tier 3 (Worker): {ticket_id}" meta_info = f"[STARTED] Ticket: {ticket_id}" if model: meta_info += f" | Model: {model}" if persona_id: meta_info += f" | Persona: {persona_id}" meta_info += "\n" + "="*50 + "\n" if stream_id not in controller.mma_streams: controller.mma_streams[stream_id] = "" controller.mma_streams[stream_id] = meta_info + controller.mma_streams[stream_id] def _handle_ticket_completed(controller: 'AppController', task: dict): """[SDM: AppController._handle_ticket_completed]""" payload = task.get("payload", {}) ticket_id = payload.get("ticket_id") end_time = payload.get("timestamp") if ticket_id and end_time and ticket_id in controller._ticket_start_times: start_time = controller._ticket_start_times.pop(ticket_id) elapsed = end_time - start_time controller._completed_ticket_count += 1 controller._avg_ticket_time = ((controller._avg_ticket_time * (controller._completed_ticket_count - 1)) + elapsed) / controller._completed_ticket_count def _handle_bead_updated(controller: 'AppController', task: dict): """[SDM: AppController._handle_bead_updated]""" payload = task.get("payload", {}) bead_id = payload.get("bead_id") or payload.get("bid") status = payload.get("status") if bead_id and status: stream_id = "Tier 2 (Tech Lead)" msg = f"\n[BEAD UPDATE] {bead_id} -> status: {status}\n" if stream_id not in controller.mma_streams: controller.mma_streams[stream_id] = "" controller.mma_streams[stream_id] += msg def _handle_set_comms_dirty(controller: 'AppController', task: dict): """[SDM: AppController._handle_set_comms_dirty]""" controller._comms_log_dirty = True def _handle_set_tool_log_dirty(controller: 'AppController', task: dict): """[SDM: AppController._handle_set_tool_log_dirty]""" controller._tool_log_dirty = True def _handle_refresh_from_project(controller: 'AppController', task: dict): """[SDM: AppController._handle_refresh_from_project]""" controller._refresh_from_project() def _handle_show_patch_modal(controller: 'AppController', task: dict): """[SDM: AppController._handle_show_patch_modal]""" controller._pending_patch_text = task.get("patch_text", "") controller._pending_patch_files = task.get("file_paths", []) controller._show_patch_modal = True def _handle_hide_patch_modal(controller: 'AppController', task: dict): """[SDM: AppController._handle_hide_patch_modal]""" controller._show_patch_modal = False controller._pending_patch_text = None controller._pending_patch_files = [] #endregion class AppController: """ The headless controller for the Manual Slop application. Owns the application state and manages background services. """ def __init__(self): """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ # --- Locks --- self._send_thread_lock: threading.Lock = threading.Lock() self._disc_entries_lock: threading.Lock = threading.Lock() self._pending_comms_lock: threading.Lock = threading.Lock() self._pending_tool_calls_lock: threading.Lock = threading.Lock() self._pending_history_adds_lock: threading.Lock = threading.Lock() self._pending_gui_tasks_lock: threading.Lock = threading.Lock() self._pending_dialog_lock: threading.Lock = threading.Lock() self._api_event_queue_lock: threading.Lock = threading.Lock() self._rag_engine_lock: threading.Lock = threading.Lock() # --- Internal State --- self._ai_status: str = "idle" self._mma_status: str = "idle" self._pending_gui_tasks: List[Dict[str, Any]] = [] self._api_event_queue: List[Dict[str, Any]] = [] self._pending_dialog: Optional[ConfirmDialog] = None self._pending_dialog_open: bool = False self._pending_actions: Dict[str, ConfirmDialog] = {} self._pending_ask_dialog: bool = False self._loop_thread: Optional[threading.Thread] = None self._worker_status: Dict[str, str] = {} self._pending_patch_text: Optional[str] = None self._pending_patch_files: List[str] = [] self._show_patch_modal: bool = False self._patch_error_message: Optional[str] = None self._tool_log: List[Dict[str, Any]] = [] self._tool_stats: Dict[str, Dict[str, Any]] = {} self._cached_cache_stats: Dict[str, Any] = {} self._cached_files: List[str] = [] self._token_history: List[Dict[str, Any]] = [] self._session_start_time: float = time.time() self._ticket_start_times: dict[str, float] = {} self._avg_ticket_time: float = 0.0 self._completed_ticket_count: int = 0 self._comms_log: List[Dict[str, Any]] = [] self._last_telemetry_time: float = 0.0 self._current_provider: str = "gemini" self._current_model: str = "gemini-2.5-flash-lite" self._autofocus_response_tab = False self._show_track_proposal_modal: bool = False self._pending_comms: List[Dict[str, Any]] = [] self._pending_tool_calls: List[Dict[str, Any]] = [] self._pending_history_adds: List[Dict[str, Any]] = [] self._perf_last_update: float = 0.0 self._last_autosave: float = time.time() self._ask_dialog_open: bool = False self._ask_request_id: Optional[str] = None self._ask_tool_data: Optional[Dict[str, Any]] = None self._pending_mma_approvals: List[Dict[str, Any]] = [] self._mma_approval_open: bool = False self._mma_approval_edit_mode: bool = False self._mma_approval_payload: str = "" self._pending_mma_spawns: List[Dict[str, Any]] = [] self._mma_spawn_open: bool = False self._mma_spawn_edit_mode: bool = False self._mma_spawn_prompt: str = '' self._mma_spawn_context: str = '' self._trigger_blink: bool = False self._is_blinking: bool = False self._blink_start_time: float = 0.0 self._trigger_script_blink: bool = False self._is_script_blinking: bool = False self._script_blink_start_time: float = 0.0 self._scroll_disc_to_bottom: bool = False self._scroll_comms_to_bottom: bool = False self._scroll_tool_calls_to_bottom: bool = False self._gemini_cache_text: str = "" self._last_stable_md: str = '' self._token_stats: Dict[str, Any] = {} self._comms_log_dirty: bool = True self._tool_log_dirty: bool = True self._token_stats_dirty: bool = True self._tier_stream_last_len: Dict[str, int] = {} self._current_session_usage = None self._current_mma_tier_usage = None self._current_token_history = None self._current_session_start_time = None self._inject_file_path: str = "" self._inject_mode: str = "skeleton" self._inject_preview: str = "" self._show_inject_modal: bool = False self._editing_preset_name: str = "" self._editing_preset_content: str = "" self._editing_preset_temperature: float = 0.0 self._editing_preset_top_p: float = 0.0 self._editing_preset_max_output_tokens: int = 4096 self._editing_preset_scope: str = "project" self._editing_tool_preset_name: str = "" self._editing_tool_preset_categories: Dict[str, Dict[str, Any]] = {} self._editing_tool_preset_scope: str = "project" self._show_base_prompt_diff_modal: bool = False self._autosave_interval: float = 60.0 self._show_add_ticket_form: bool = False self._track_discussion_active: bool = False # --- Core State --- self.config: Dict[str, Any] = {} self.project: Dict[str, Any] = {} self.active_project_path: str = "" self.project_paths: List[str] = [] self.active_discussion: str = "main" self.disc_entries: List[Dict[str, Any]] = [] self.discussion_sent_markdown: str = "" self.discussion_sent_system_prompt: str = "" self.disc_roles: List[str] = [] self.tracks: List[Dict[str, Any]] = [] self.active_track: Optional[models.Track] = None self.engines: Dict[str, multi_agent_conductor.ConductorEngine] = {} self.mma_streams: Dict[str, str] = {} self.MAX_STREAM_SIZE: int = 10 * 1024 self.session_usage: Dict[str, Any] = { "input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0, "percentage": 0.0 } self.mma_tier_usage: Dict[str, Dict[str, Any]] = { "Tier 1": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-3.1-pro-preview", "tool_preset": None}, "Tier 2": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-3-flash-preview", "tool_preset": None}, "Tier 3": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite", "tool_preset": None}, "Tier 4": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite", "tool_preset": None}, } self.mcp_config: models.MCPConfiguration = models.MCPConfiguration() self.view_presets: list[models.NamedViewPreset] = [] self.rag_config: Optional[models.RAGConfig] = None self.rag_status: str = 'idle' self.temperature: float = 0.0 self.top_p: float = 1.0 self.max_tokens: int = 8192 self.history_trunc_limit: int = 8000 self.ai_response: str = '' self.last_md: str = '' self.last_aggregate_markdown: str = '' self.last_resolved_system_prompt: str = '' self.last_md_path: Optional[Path] = None self.last_file_items: List[Any] = [] self.send_thread: Optional[threading.Thread] = None self.models_thread: Optional[threading.Thread] = None self.show_windows: Dict[str, bool] = {} self.show_script_output: bool = False self.text_viewer_title: str = '' self.text_viewer_content: str = '' self.text_viewer_type: str = 'text' self.perf_history: Dict[str, List[float]] = {'frame_time': [0.0]*100, 'fps': [0.0]*100, 'cpu': [0.0]*100, 'input_lag': [0.0]*100} self.mma_step_mode: bool = False self.active_tier: Optional[str] = None self.test_hooks_enabled: bool = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1") self.is_viewing_prior_session: bool = False self.prior_session_entries: List[Dict[str, Any]] = [] self.prior_tool_calls: List[Dict[str, Any]] = [] self.prior_disc_entries: List[Dict[str, Any]] = [] self.prior_mma_dashboard_state = {} self.diagnostic_log: List[Dict[str, Any]] = [] self.ui_active_tool_preset: str | None = None self.available_models: List[str] = [] self.all_available_models: Dict[str, List[str]] = {} self.proposed_tracks: List[Dict[str, Any]] = [] self.show_preset_manager_window: bool = False self.show_tool_preset_manager_window: bool = False self.show_persona_editor_window: bool = False # --- UI State --- self.ui_ai_input: str = "" self.ui_disc_new_name_input: str = "" self.ui_disc_new_role_input: str = "" self.ui_epic_input: str = "" self.ui_new_track_name: str = "" self.ui_new_track_desc: str = "" self.ui_new_track_type: str = "feature" self.ui_project_conductor_dir: str = "" self.ui_conductor_setup_summary: str = "" self.ui_last_script_text: str = "" self.ui_last_script_output: str = "" self.ui_new_ticket_id: str = "" self.ui_new_ticket_desc: str = "" self.ui_new_ticket_target: str = "" self.ui_new_ticket_deps: str = "" self.ui_output_dir: str = "" self.ui_files_base_dir: str = "" self.ui_shots_base_dir: str = "" self.ui_project_git_dir: str = "" self.ui_project_system_prompt: str = "" self.ui_project_execution_mode: str = "native" self.ui_gemini_cli_path: str = "gemini" self.ui_word_wrap: bool = True self.ui_auto_add_history: bool = False self.ui_separate_message_panel: bool = False self.ui_separate_response_panel: bool = False self.ui_separate_tool_calls_panel: bool = False self.ui_global_system_prompt: str = "" self.ui_base_system_prompt: str = "" self.ui_use_default_base_prompt: bool = True self.ui_project_context_marker: str = "" self.ui_agent_tools: Dict[str, bool] = {} self.ui_manual_approve: bool = False self.ui_disc_truncate_pairs: int = 2 self.ui_auto_scroll_comms: bool = True self.ui_auto_scroll_tool_calls: bool = True self.ui_focus_agent: Optional[str] = None self.ui_active_persona: str = "" # --- Media/Context --- self.files: List[models.FileItem] = [] self.context_files: List[models.FileItem] = [] self.screenshots: List[str] = [] # --- Services --- self.event_queue: events.AsyncEventQueue = events.AsyncEventQueue() self.rag_engine: Optional[Any] = None # --- Configuration Maps --- self._settable_fields: Dict[str, str] = { 'ai_input': 'ui_ai_input', 'project_git_dir': 'ui_project_git_dir', 'auto_add_history': 'ui_auto_add_history', 'disc_new_name_input': 'ui_disc_new_name_input', 'gcli_path': 'ui_gemini_cli_path', 'output_dir': 'ui_output_dir', 'files_base_dir': 'ui_files_base_dir', 'files': 'ui_file_paths', 'screenshots': 'screenshots', 'ai_status': 'ai_status', 'ai_response': 'ai_response', 'active_discussion': 'active_discussion', 'current_provider': 'current_provider', 'current_model': 'current_model', 'token_budget_pct': '_token_budget_pct', 'token_budget_current': '_token_budget_current', 'token_budget_label': '_token_budget_label', 'show_confirm_modal': 'show_confirm_modal', 'mma_epic_input': 'ui_epic_input', 'mma_status': 'mma_status', 'rag_status': 'rag_status', 'rag_enabled': 'rag_enabled', 'rag_source': 'rag_source', 'rag_emb_provider': 'rag_emb_provider', 'rag_mcp_server': 'rag_mcp_server', 'rag_mcp_tool': 'rag_mcp_tool', 'rag_chunk_size': 'rag_chunk_size', 'rag_chunk_overlap': 'rag_chunk_overlap', 'rag_collection_name': 'rag_collection_name', 'mcp_config_json': 'mcp_config_json', 'mma_active_tier': 'active_tier', 'ui_new_track_name': 'ui_new_track_name', 'ui_new_track_desc': 'ui_new_track_desc', 'manual_approve': 'ui_manual_approve', 'global_system_prompt': 'ui_global_system_prompt', 'project_system_prompt': 'ui_project_system_prompt', 'base_system_prompt': 'ui_base_system_prompt', 'use_default_base_prompt': 'ui_use_default_base_prompt', 'show_base_prompt_diff_modal': '_show_base_prompt_diff_modal', 'global_preset_name': 'ui_global_preset_name', 'project_preset_name': 'ui_project_preset_name', 'ui_active_tool_preset': 'ui_active_tool_preset', 'ui_active_bias_profile': 'ui_active_bias_profile', 'temperature': 'temperature', 'max_tokens': 'max_tokens', 'show_preset_manager_window': 'show_preset_manager_window', 'show_tool_preset_manager_window': 'show_tool_preset_manager_window', 'show_persona_editor_window': 'show_persona_editor_window', '_editing_preset_name': '_editing_preset_name', '_editing_preset_content': '_editing_preset_content', '_editing_preset_temperature': '_editing_preset_temperature', '_editing_preset_top_p': '_editing_preset_top_p', '_editing_preset_max_output_tokens': '_editing_preset_max_output_tokens', '_editing_preset_scope': '_editing_preset_scope', '_editing_tool_preset_name': '_editing_tool_preset_name', '_editing_tool_preset_categories': '_editing_tool_preset_categories', '_editing_tool_preset_scope': '_editing_tool_preset_scope', 'show_windows': 'show_windows', 'ui_separate_task_dag': 'ui_separate_task_dag', 'ui_separate_usage_analytics': 'ui_separate_usage_analytics', 'ui_separate_tier1': 'ui_separate_tier1', 'ui_separate_tier2': 'ui_separate_tier2', 'ui_separate_tier3': 'ui_separate_tier3', 'ui_separate_tier4': 'ui_separate_tier4', 'text_viewer_title': 'text_viewer_title', 'text_viewer_type': 'text_viewer_type', 'disc_entries': 'disc_entries', 'ui_file_paths': 'ui_file_paths', 'ui_auto_switch_layout': 'ui_auto_switch_layout', 'ui_tier_layout_bindings': 'ui_tier_layout_bindings', 'ui_focus_agent': 'ui_focus_agent' } self._gettable_fields = dict(self._settable_fields) self._gettable_fields.update({ 'show_windows': 'show_windows', # Key 'show_windows' maps to field 'show_windows' on controller 'ui_focus_agent': 'ui_focus_agent', 'active_discussion': 'active_discussion', '_track_discussion_active': '_track_discussion_active', 'proposed_tracks': 'proposed_tracks', 'mma_streams': 'mma_streams', '_worker_status': '_worker_status', 'active_track': 'active_track', 'active_tickets': 'active_tickets', 'tracks': 'tracks', 'thinking_indicator': 'thinking_indicator', 'operations_live_indicator': 'operations_live_indicator', 'prior_session_indicator': 'prior_session_indicator', '_show_patch_modal': '_show_patch_modal', '_pending_patch_text': '_pending_patch_text', '_pending_patch_files': '_pending_patch_files', '_inject_file_path': '_inject_file_path', '_inject_mode': '_inject_mode', '_inject_preview': '_inject_preview', '_show_inject_modal': '_show_inject_modal', 'bg_shader_enabled': 'bg_shader_enabled', 'global_system_prompt': 'ui_global_system_prompt', 'project_system_prompt': 'ui_project_system_prompt', 'base_system_prompt': 'ui_base_system_prompt', 'use_default_base_prompt': 'ui_use_default_base_prompt', 'show_base_prompt_diff_modal': '_show_base_prompt_diff_modal', 'global_preset_name': 'ui_global_preset_name', 'project_preset_name': 'ui_project_preset_name', 'ui_active_tool_preset': 'ui_active_tool_preset', 'ui_active_bias_profile': 'ui_active_bias_profile', 'temperature': 'temperature', 'max_tokens': 'max_tokens', 'show_preset_manager_window': 'show_preset_manager_window', 'show_tool_preset_manager_window': 'show_tool_preset_manager_window', 'show_persona_editor_window': 'show_persona_editor_window', '_editing_preset_name': '_editing_preset_name', '_editing_preset_content': '_editing_preset_content', '_editing_preset_temperature': '_editing_preset_temperature', '_editing_preset_top_p': '_editing_preset_top_p', '_editing_preset_max_output_tokens': '_editing_preset_max_output_tokens', '_editing_preset_scope': '_editing_preset_scope', 'ui_separate_task_dag': 'ui_separate_task_dag', 'ui_separate_usage_analytics': 'ui_separate_usage_analytics', 'ui_separate_tier1': 'ui_separate_tier1', 'ui_separate_tier2': 'ui_separate_tier2', 'ui_separate_tier3': 'ui_separate_tier3', 'ui_separate_tier4': 'ui_separate_tier4', 'text_viewer_title': 'text_viewer_title', 'text_viewer_type': 'text_viewer_type' }) self.context_preset_manager = ContextPresetManager() self.perf_monitor = performance_monitor.get_monitor() self._perf_profiling_enabled = False self._gui_task_handlers: Dict[str, Callable] = { "refresh_api_metrics": _handle_refresh_api_metrics, "set_ai_status": _handle_set_ai_status, "set_mma_status": _handle_set_mma_status, "handle_ai_response": _handle_ai_response, "mma_stream": _handle_mma_stream, "mma_stream_append": _handle_mma_stream, "show_track_proposal": _handle_show_track_proposal, "mma_state_update": _handle_mma_state_update, "custom_callback": _handle_custom_callback, "set_value": _handle_set_value, "click": _handle_click, "drag": _handle_drag, "right_click": _handle_right_click, "select_list_item": _handle_select_list_item, "ask": _handle_ask, "clear_ask": _handle_clear_ask, "mma_step_approval": _handle_mma_step_approval, "mma_spawn_approval": _handle_mma_spawn_approval, "ticket_started": _handle_ticket_started, "ticket_completed": _handle_ticket_completed, "bead_updated": _handle_bead_updated, "set_comms_dirty": _handle_set_comms_dirty, "set_tool_log_dirty": _handle_set_tool_log_dirty, "refresh_from_project": _handle_refresh_from_project, "show_patch_modal": _handle_show_patch_modal, "hide_patch_modal": _handle_hide_patch_modal, } self._init_actions() @property def perf_profiling_enabled(self) -> bool: return self._perf_profiling_enabled @perf_profiling_enabled.setter def perf_profiling_enabled(self, value: bool) -> None: self._perf_profiling_enabled = value if hasattr(self, 'perf_monitor'): self.perf_monitor.enabled = value @property def active_project_root(self) -> str: if self.active_project_path: return str(Path(self.active_project_path).parent) return self.ui_files_base_dir def _update_inject_preview(self) -> None: """ Updates the preview content based on the selected file and injection mode. [C: src/gui_2.py:App._render_text_viewer_window, tests/test_skeleton_injection.py:test_update_inject_preview_full, tests/test_skeleton_injection.py:test_update_inject_preview_skeleton, tests/test_skeleton_injection.py:test_update_inject_preview_truncation] """ if not self._inject_file_path: self._inject_preview = "" return target_path = self._inject_file_path if not os.path.isabs(target_path): target_path = os.path.join(self.active_project_root, target_path) if not os.path.exists(target_path): self._inject_preview = "" return try: with open(target_path, "r", encoding="utf-8") as f: content = f.read() if self._inject_mode == "skeleton" and target_path.endswith(".py"): parser = ASTParser("python") preview = parser.get_skeleton(content) else: preview = content lines = preview.splitlines() if len(lines) > 500: preview = "\n".join(lines[:500]) + "\n... (truncated)" self._inject_preview = preview except Exception as e: self._inject_preview = f"Error reading file: {e}" @property def ai_status(self) -> str: return self._ai_status @ai_status.setter def ai_status(self, value: str) -> None: self._ai_status = value @property def mma_status(self) -> str: return self._mma_status @mma_status.setter def mma_status(self, value: str) -> None: self._mma_status = value @property def thinking_indicator(self) -> bool: return self.ai_status in ("sending...", "streaming...") @property def summary_cache(self) -> Any: from src import summarize return summarize._summary_cache @property def rag_enabled(self) -> bool: return self.rag_config.enabled if self.rag_config else False def _sync_rag_engine(self): """ Re-initializes the RAG engine in a background thread to avoid blocking the UI. """ self._set_rag_status("initializing...") def _task(): try: from src import rag_engine engine = rag_engine.RAGEngine(self.rag_config, self.active_project_root) with self._rag_engine_lock: self.rag_engine = engine # If the engine is empty and we have files, trigger indexing if self.rag_engine and self.rag_engine.is_empty() and self.files: self._rebuild_rag_index() else: self._set_rag_status("ready") except Exception as e: self._set_rag_status(f"error: {e}") sys.stderr.write(f"[DEBUG RAG] Failed to sync engine: {e}\n") sys.stderr.flush() threading.Thread(target=_task, daemon=True).start() @property def rag_enabled(self) -> bool: return self.rag_config.enabled if self.rag_config else False @rag_enabled.setter def rag_enabled(self, value: bool) -> None: if self.rag_config: self.rag_config.enabled = value self._sync_rag_engine() @property def rag_source(self) -> str: return self.rag_config.vector_store.provider if self.rag_config else 'mock' @rag_source.setter def rag_source(self, value: str) -> None: if self.rag_config: self.rag_config.vector_store.provider = value self._sync_rag_engine() @property def rag_emb_provider(self) -> str: return self.rag_config.embedding_provider if self.rag_config else 'gemini' @rag_emb_provider.setter def rag_emb_provider(self, value: str) -> None: if self.rag_config: self.rag_config.embedding_provider = value self._sync_rag_engine() @property def rag_chunk_size(self) -> int: return self.rag_config.chunk_size if self.rag_config else 1000 @rag_chunk_size.setter def rag_chunk_size(self, value: int) -> None: if self.rag_config: self.rag_config.chunk_size = value @property def rag_chunk_overlap(self) -> int: return self.rag_config.chunk_overlap if self.rag_config else 200 @rag_chunk_overlap.setter def rag_chunk_overlap(self, value: int) -> None: if self.rag_config: self.rag_config.chunk_overlap = value @property def rag_mcp_server(self) -> str: return self.rag_config.vector_store.mcp_server or "" if self.rag_config else "" @rag_mcp_server.setter def rag_mcp_server(self, value: str) -> None: if self.rag_config: self.rag_config.vector_store.mcp_server = value @property def rag_mcp_tool(self) -> str: return self.rag_config.vector_store.mcp_tool or "" if self.rag_config else "" @rag_mcp_tool.setter def rag_mcp_tool(self, value: str) -> None: if self.rag_config: self.rag_config.vector_store.mcp_tool = value @property def rag_collection_name(self) -> str: return self.rag_config.vector_store.collection_name if self.rag_config else "manual_slop" @rag_collection_name.setter def rag_collection_name(self, value: str) -> None: if self.rag_config: self.rag_config.vector_store.collection_name = value self._sync_rag_engine() @property def mcp_config_json(self) -> str: return json.dumps(self.mcp_config.to_dict()) if self.mcp_config else "{}" @mcp_config_json.setter def mcp_config_json(self, value: str) -> None: try: data = json.loads(value) self.mcp_config = models.MCPConfiguration.from_dict(data) except: pass @property def ui_file_paths(self) -> list[str]: return [f.path if hasattr(f, 'path') else str(f) for f in self.files] @ui_file_paths.setter def ui_file_paths(self, value: list[str]) -> None: old_files = {f.path: f for f in self.files if hasattr(f, 'path')} new_files = [] import time now = time.time() for p in value: if p in old_files: new_files.append(old_files[p]) else: from src import models new_files.append(models.FileItem(path=p, injected_at=now)) self.files = new_files @property def operations_live_indicator(self) -> bool: return not self.is_viewing_prior_session @property def prior_session_indicator(self) -> bool: return self.is_viewing_prior_session def _init_actions(self) -> None: # Set up state-related action maps self._clickable_actions: dict[str, Callable[..., Any]] = { 'btn_reset': self._handle_reset_session, 'btn_gen_send': self._handle_generate_send, 'btn_md_only': self._handle_md_only, 'btn_approve_script': self._handle_approve_script, 'btn_reject_script': self._handle_reject_script, 'btn_project_save': self._cb_project_save, 'btn_disc_create': self._cb_disc_create, 'btn_mma_plan_epic': self._cb_plan_epic, 'btn_mma_accept_tracks': self._cb_accept_tracks, 'btn_mma_start_track': self._cb_start_track, 'btn_mma_create_track': lambda: self._cb_create_track(self.ui_new_track_name, self.ui_new_track_desc, self.ui_new_track_type), 'btn_approve_tool': self._handle_approve_ask, 'btn_approve_mma_step': lambda: self._handle_mma_respond(approved=True), 'btn_approve_spawn': lambda: self._handle_mma_respond(approved=True), 'btn_prune_logs': self.cb_prune_logs, 'btn_reset_base_prompt': self._cb_reset_base_prompt, 'btn_show_base_prompt_diff': self._cb_show_base_prompt_diff, 'btn_rebuild_rag_index': self._rebuild_rag_index, 'btn_clear_summary_cache': self._handle_clear_summary_cache, } self._drag_actions: dict[str, Callable[..., Any]] = {} self._right_clickable_actions: dict[str, Callable[..., Any]] = {} self._predefined_callbacks: dict[str, Callable[..., Any]] = { '_test_callback_func_write_to_file': self._test_callback_func_write_to_file, '_set_env_var': lambda k, v: os.environ.update({k: v}), '_set_attr': lambda k, v: setattr(self, k, v), '_apply_preset': self._apply_preset, '_cb_save_preset': self._cb_save_preset, '_cb_delete_preset': self._cb_delete_preset, '_cb_save_tool_preset': self._cb_save_tool_preset, '_cb_delete_tool_preset': self._cb_delete_tool_preset, '_switch_project': self._switch_project, '_refresh_from_project': self._refresh_from_project, 'save_workspace_profile': self._cb_save_workspace_profile, 'load_workspace_profile': self._cb_load_workspace_profile, 'delete_workspace_profile': self._cb_delete_workspace_profile, '_cb_create_track': self._cb_create_track, } def _update_gcli_adapter(self, path: str) -> None: if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=str(path)) else: ai_client._gemini_cli_adapter.binary_path = str(path) def _set_rag_status(self, status: str) -> None: """Thread-safe update of rag_status via the GUI task queue.""" with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "set_value", "item": "rag_status", "value": status }) def _rebuild_rag_index(self) -> None: """Background thread that re-indexes all files in the current project.""" if not self.rag_config or not self.rag_config.enabled or not self.rag_engine: return def _run(): try: self._set_rag_status("indexing...") import concurrent.futures # 1. Incremental indexing of current files in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [] def do_index(p): if self.rag_engine: self.rag_engine.index_file(p) for f in self.files: path = f.path if hasattr(f, "path") else str(f) futures.append(executor.submit(do_index, path)) concurrent.futures.wait(futures) # 2. Cleanup stale entries (files no longer tracked) indexed_paths = self.rag_engine.get_all_indexed_paths() current_paths = {f.path if hasattr(f, "path") else str(f) for f in self.files} stale_paths = [p for p in indexed_paths if p not in current_paths] if stale_paths: self.rag_engine.delete_documents_by_path(stale_paths) self._set_rag_status("ready") except Exception as e: self._set_rag_status(f"error: {e}") threading.Thread(target=_run, daemon=True).start() def _trigger_gui_refresh(self): with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({'action': 'set_comms_dirty'}) self._pending_gui_tasks.append({'action': 'set_tool_log_dirty'}) def _process_pending_gui_tasks(self) -> None: """ Processes pending GUI tasks from the queue on the main render thread. [C: src/gui_2.py:App._render_main_interface, tests/test_api_hook_extensions.py:test_app_processes_new_actions, tests/test_gui_updates.py:test_gui_updates_on_event, tests/test_live_gui_integration_v2.py:test_user_request_error_handling, tests/test_live_gui_integration_v2.py:test_user_request_integration_flow, tests/test_mma_orchestration_gui.py:test_handle_ai_response_fallback, tests/test_mma_orchestration_gui.py:test_handle_ai_response_with_stream_id, tests/test_mma_orchestration_gui.py:test_process_pending_gui_tasks_mma_spawn_approval, tests/test_mma_orchestration_gui.py:test_process_pending_gui_tasks_show_track_proposal, tests/test_process_pending_gui_tasks.py:test_gcli_path_updates_adapter, tests/test_process_pending_gui_tasks.py:test_process_pending_gui_tasks_drag, tests/test_process_pending_gui_tasks.py:test_process_pending_gui_tasks_right_click, tests/test_process_pending_gui_tasks.py:test_redundant_calls_in_process_pending_gui_tasks] """ now = time.time() if hasattr(self, 'event_queue') and hasattr(self.event_queue, 'websocket_server') and self.event_queue.websocket_server: if now - self._last_telemetry_time >= 1.0: self._last_telemetry_time = now metrics = self.perf_monitor.get_metrics() self.event_queue.websocket_server.broadcast("telemetry", metrics) if not self._pending_gui_tasks: return with self._pending_gui_tasks_lock: tasks = self._pending_gui_tasks[:] self._pending_gui_tasks.clear() for task in tasks: try: action = task.get("action") or task.get("type") if action: session_logger.log_api_hook("PROCESS_TASK", action, str(task)) if action in self._gui_task_handlers: self._gui_task_handlers[action](self, task) except Exception as e: print(f"Error executing GUI task ({task.get('action') or task.get('type')}): {e}") traceback.print_exc() def _process_pending_history_adds(self) -> None: """ Synchronizes pending history entries to the active discussion and project state. [C: src/gui_2.py:App._render_main_interface] """ with self._pending_history_adds_lock: items = self._pending_history_adds[:] self._pending_history_adds.clear() if not items: return self._scroll_disc_to_bottom = True for item in items: item.get("role", "unknown") if item.get("role") and item["role"] not in self.disc_roles: self.disc_roles.append(item["role"]) disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) disc_data = discussions.get(self.active_discussion) if disc_data is not None: if item.get("disc_title", self.active_discussion) == self.active_discussion: if self.disc_entries is not disc_data.get("history"): if "history" not in disc_data: disc_data["history"] = [] disc_data["history"].append(project_manager.entry_to_str(item)) disc_data["last_updated"] = project_manager.now_ts() with self._disc_entries_lock: self.disc_entries.append(item) def _process_pending_tool_calls(self) -> bool: """ Drains pending tool calls into the tool log. Returns True if any were processed. [C: src/gui_2.py:App._render_main_interface] """ with self._pending_tool_calls_lock: items = self._pending_tool_calls[:] self._pending_tool_calls.clear() if not items: return False for item in items: self._append_tool_log( item.get("script", ""), item.get("result", ""), source_tier=item.get("source_tier") ) return True def _test_callback_func_write_to_file(self, data: str) -> None: """A dummy function that a custom_callback would execute for testing.""" callback_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "tests", "artifacts", "temp_callback_output.txt") os.makedirs(os.path.dirname(callback_path), exist_ok=True) with open(callback_path, "w") as f: f.write(data) def _handle_approve_script(self, user_data=None) -> None: """Approves the currently pending PowerShell script.""" with self._pending_dialog_lock: dlg = self._pending_dialog if dlg: with dlg._condition: dlg._approved = True dlg._done = True dlg._condition.notify_all() self._pending_dialog = None def _handle_reject_script(self, user_data=None) -> None: """Rejects the currently pending PowerShell script.""" with self._pending_dialog_lock: dlg = self._pending_dialog if dlg: with dlg._condition: dlg._approved = False dlg._done = True dlg._condition.notify_all() self._pending_dialog = None def init_state(self): """ Initializes the application state from configurations. [C: src/gui_2.py:App.__init__, src/gui_2.py:App._save_paths, src/gui_2.py:App._set_external_editor_default, tests/test_app_controller_mcp.py:test_app_controller_mcp_loading, tests/test_app_controller_mcp.py:test_app_controller_mcp_project_override, tests/test_context_composition_decoupled.py:test_do_generate_uses_context_files, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call, tests/test_system_prompt_exposure.py:TestSystemPromptExposure.test_app_controller_init_state_loads_prompts] """ self.active_tickets = [] self.ui_separate_task_dag = False self.ui_separate_usage_analytics = False self.ui_separate_tier1 = False self.ui_separate_tier2 = False self.ui_separate_tier3 = False self.ui_separate_tier4 = False self.ui_separate_message_panel = False self.ui_separate_response_panel = False self.ui_separate_tool_calls_panel = False self.ui_separate_external_tools = False self.config = models.load_config() theme.load_from_config(self.config) ai_cfg = self.config.get("ai", {}) self._current_provider = ai_cfg.get("provider", "gemini") self._current_model = ai_cfg.get("model", "gemini-2.5-flash-lite") self.temperature = ai_cfg.get("temperature", 0.0) self.top_p = ai_cfg.get("top_p", 1.0) self.max_tokens = ai_cfg.get("max_tokens", 8192) self.history_trunc_limit = ai_cfg.get("history_trunc_limit", 8000) projects_cfg = self.config.get("projects", {}) self.project_paths = list(projects_cfg.get("paths", [])) self.active_project_path = projects_cfg.get("active", "") self._load_active_project() # Track 5: Path isolation overrides if self.project and 'paths' in self.project: project_root = Path(self.active_project_path).parent if self.active_project_path else Path.cwd() proj_paths = self.project.get('paths', {}) if 'logs_dir' in proj_paths: lpath = Path(proj_paths['logs_dir']) if not lpath.is_absolute(): lpath = project_root / lpath os.environ['SLOP_LOGS_DIR'] = str(lpath) if 'scripts_dir' in proj_paths: spath = Path(proj_paths['scripts_dir']) if not spath.is_absolute(): spath = project_root / spath os.environ['SLOP_SCRIPTS_DIR'] = str(spath) paths.reset_resolved() path_info = paths.get_full_path_info() self.ui_logs_dir = str(path_info['logs_dir']['path']) self.ui_scripts_dir = str(path_info['scripts_dir']['path']) if not self.project or not isinstance(self.project, dict) or "project" not in self.project: name = Path(self.active_project_path).stem if self.active_project_path else "unnamed" self.project = project_manager.default_project(name) self.workspace_manager = workspace_manager.WorkspaceManager(project_root=Path(self.active_project_path).parent if self.active_project_path else None) self.workspace_profiles = self.workspace_manager.load_all_profiles() # Deserialize FileItems in files.paths raw_paths = self.project.get("files", {}).get("paths", []) self.files = [] for p in raw_paths: if isinstance(p, models.FileItem): self.files.append(p) elif isinstance(p, dict): self.files.append(models.FileItem.from_dict(p)) else: self.files.append(models.FileItem(path=str(p))) self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles = list(disc_sec.get("roles", ["User", "AI", "Vendor API", "System", "Reasoning", "Context"])) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) with self._disc_entries_lock: self.disc_entries[:] = models.parse_history_entries(disc_data.get("history", []), self.disc_roles) # UI state self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".") proj_meta = self.project.get("project", {}) self.ui_project_git_dir = proj_meta.get("git_dir", "") self.ui_project_conductor_dir = self.project.get('conductor', {}).get('dir', 'conductor') self.ui_project_system_prompt = proj_meta.get("system_prompt", "") self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini") self._update_gcli_adapter(self.ui_gemini_cli_path) self.ui_word_wrap = proj_meta.get("word_wrap", True) self.ui_auto_add_history = disc_sec.get("auto_add", False) self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "") self.ui_base_system_prompt = self.config.get("ai", {}).get("base_system_prompt", "") self.ui_use_default_base_prompt = self.config.get("ai", {}).get("use_default_base_prompt", True) self.ui_project_context_marker = proj_meta.get("context_marker", "") self.preset_manager = presets.PresetManager(Path(self.active_project_path).parent if self.active_project_path else None) self.presets = self.preset_manager.load_all() self.tool_preset_manager = tool_presets.ToolPresetManager(Path(self.active_project_path).parent if self.active_project_path else None) self.tool_presets = self.tool_preset_manager.load_all_presets() self.bias_profiles = self.tool_preset_manager.load_all_bias_profiles() mcp_path = self.project.get('project', {}).get('mcp_config_path') or self.config.get('ai', {}).get('mcp_config_path') if mcp_path: mcp_p = Path(mcp_path) if not mcp_p.is_absolute() and self.active_project_path: mcp_p = Path(self.active_project_path).parent / mcp_path if mcp_p.exists(): self.mcp_config = models.load_mcp_config(str(mcp_p)) else: self.mcp_config = models.MCPConfiguration() else: self.mcp_config = models.MCPConfiguration() rag_data = self.config.get('rag') if rag_data: self.rag_config = models.RAGConfig.from_dict(rag_data) else: self.rag_config = models.RAGConfig() self.rag_engine = None if self.rag_config.enabled: self._sync_rag_engine() from src.personas import PersonaManager self.persona_manager = PersonaManager(Path(self.active_project_path).parent if self.active_project_path else None) self.personas = self.persona_manager.load_all() self._fetch_models(self.current_provider) self.ui_active_tool_preset = os.environ.get('SLOP_TOOL_PRESET') or ai_cfg.get("active_tool_preset") self.ui_active_bias_profile = ai_cfg.get("active_bias_profile") ai_client.set_tool_preset(self.ui_active_tool_preset) ai_client.set_bias_profile(self.ui_active_bias_profile) self.ui_global_preset_name = ai_cfg.get("active_preset") self.ui_project_preset_name = proj_meta.get("active_preset") gui_cfg = self.config.get("gui", {}) self.ui_separate_message_panel = gui_cfg.get('separate_message_panel', False) self.ui_separate_response_panel = gui_cfg.get('separate_response_panel', False) self.ui_separate_tool_calls_panel = gui_cfg.get('separate_tool_calls_panel', False) self.ui_auto_switch_layout = gui_cfg.get("auto_switch_layout", False) self.ui_tier_layout_bindings = gui_cfg.get("tier_layout_bindings", {"Tier 1": "", "Tier 2": "", "Tier 3": "", "Tier 4": ""}) from src import bg_shader bg_shader.get_bg().enabled = gui_cfg.get("bg_shader_enabled", False) _default_windows = { "Project Settings": True, "Files & Media": True, "AI Settings": True, "MMA Dashboard": True, "Task DAG": False, "Usage Analytics": False, "Tier 1": False, "Tier 2": False, "Tier 3": False, "Tier 4": False, "Tier 1: Strategy": True, "Tier 2: Tech Lead": True, "Tier 3: Workers": True, "Tier 4: QA": True, "Discussion Hub": True, "Operations Hub": True, "Message": False, "Response": False, "Tool Calls": False, "Text Viewer": False, "Theme": True, "Log Management": False, } saved = self.config.get("gui", {}).get("show_windows", {}) self.show_windows = {k: saved.get(k, v) for k, v in _default_windows.items()} agent_tools_cfg = self.project.get("agent", {}).get("tools", {}) self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in models.AGENT_TOOL_NAMES} label = self.project.get("project", {}).get("name", "") session_logger.reset_session(label=label) # Trigger auto-start of MCP servers self.event_queue.put('refresh_external_mcps', None) async def refresh_external_mcps(self): """ [C: tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call] """ await mcp_client.get_external_mcp_manager().stop_all() # Start servers with auto_start=True for name, cfg in self.mcp_config.mcpServers.items(): if cfg.auto_start: await mcp_client.get_external_mcp_manager().add_server(cfg) def cb_load_prior_log(self, path: Optional[str] = None) -> None: """ [C: src/gui_2.py:App._render_log_management, src/gui_2.py:App.cb_load_prior_log] """ if not path: return if not self.is_viewing_prior_session: self._current_session_usage = copy.deepcopy(self.session_usage) self._current_mma_tier_usage = copy.deepcopy(self.mma_tier_usage) self._current_token_history = copy.deepcopy(self._token_history) self._current_session_start_time = self._session_start_time log_path = Path(path) if log_path.is_dir(): log_file = log_path / "comms.log" session_dir = log_path else: log_file = log_path session_dir = log_path.parent if not log_file.exists(): self.ai_status = f"log file not found: {log_file}" return def _resolve_log_ref(content: Any, session_dir: Path) -> str: if not content or not isinstance(content, str) or "[REF:" not in content: return str(content) if content is not None else "" pattern = r'\[REF:([^\]]+)\]' def replace_ref(match): ref_file = match.group(1) paths_to_check = [ session_dir / "outputs" / ref_file, session_dir / "scripts" / ref_file ] for p in paths_to_check: if p.exists(): try: with open(p, "r", encoding="utf-8") as rf: return rf.read() except Exception: return f"[ERROR READING REF: {ref_file}]" return match.group(0) return re.sub(pattern, replace_ref, content) entries = [] disc_entries = [] paired_tools = {} final_tool_calls = [] new_token_history = [] new_usage = {'input_tokens': 0, 'output_tokens': 0, 'cache_read_input_tokens': 0, 'cache_creation_input_tokens': 0, 'total_tokens': 0, 'last_latency': 0.0, 'percentage': 0.0} new_mma_usage = copy.deepcopy(self.mma_tier_usage) for t in new_mma_usage: new_mma_usage[t]['input'] = 0 new_mma_usage[t]['output'] = 0 try: with open(log_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: try: entry = json.loads(line) entries.append(entry) kind = entry.get("kind", entry.get("type", "")) payload = entry.get("payload", {}) ts = entry.get("ts", "") if kind == 'tool_call': tid = payload.get('id') or payload.get('call_id') script = payload.get('script') or json.dumps(payload.get('args', {}), indent=1) script = _resolve_log_ref(script, session_dir) entry_obj = { 'source_tier': entry.get('source_tier', 'main'), 'script': script, 'result': '', # Waiting for result 'ts': ts } if tid: paired_tools[tid] = entry_obj final_tool_calls.append(entry_obj) elif kind == 'tool_result': tid = payload.get('id') or payload.get('call_id') output = payload.get('output', payload.get('content', '')) output = _resolve_log_ref(output, session_dir) if tid and tid in paired_tools: paired_tools[tid]['result'] = output else: # Fallback: if no ID, try matching last entry in final_tool_calls that has no result for old_call in reversed(final_tool_calls): if not old_call['result']: old_call['result'] = output break if kind == 'response' and 'usage' in payload: u = payload['usage'] for k in ['input_tokens', 'output_tokens', 'cache_read_input_tokens', 'cache_creation_input_tokens', 'total_tokens']: if k in new_usage: new_usage[k] += u.get(k, 0) or 0 tier = entry.get('source_tier', 'main') if tier in new_mma_usage: new_mma_usage[tier]['input'] += u.get('input_tokens', 0) or 0 new_mma_usage[tier]['output'] += u.get('output_tokens', 0) or 0 new_token_history.append({ 'time': ts, 'input': u.get('input_tokens', 0) or 0, 'output': u.get('output_tokens', 0) or 0, 'model': entry.get('model', 'unknown') }) if kind == "history_add": content = payload.get("content", payload.get("text", payload.get("message", ""))) content = _resolve_log_ref(content, session_dir) disc_entries.append({ "role": payload.get("role", "AI"), "content": content, "collapsed": payload.get("collapsed", False), "ts": ts }) elif kind == "request": content = payload.get("message", payload.get("content", payload.get("text", ""))) content = _resolve_log_ref(content, session_dir) disc_entries.append({ "role": "User", "content": content, "collapsed": False, "ts": ts }) elif kind == "response": text = payload.get("text", payload.get("content", payload.get("message", ""))) text = _resolve_log_ref(text, session_dir) tool_calls = payload.get("tool_calls", []) content = text if tool_calls: try: tc_str = json.dumps(tool_calls, indent=1) if content: content += f"\n\n[TOOL CALLS]\n{tc_str}" else: content = f"[TOOL CALLS]\n{tc_str}" except: if content: content += f"\n\n[TOOL CALLS PRESENT]" else: content = "[TOOL CALLS PRESENT]" disc_entries.append({ "role": "AI", "content": content, "collapsed": False, "ts": ts }) elif kind == "tool_result": output = payload.get("output", payload.get("content", "")) output = _resolve_log_ref(output, session_dir) disc_entries.append({ "role": "Tool", "content": f"[TOOL RESULT]\n{output}", "collapsed": True, "ts": ts }) except json.JSONDecodeError: continue except Exception as e: self.ai_status = f"log load error: {e}" return self.session_usage = new_usage self.mma_tier_usage = new_mma_usage self._token_history = new_token_history if new_token_history: try: import datetime first_ts = new_token_history[0]['time'] dt = datetime.datetime.strptime(first_ts, '%Y-%m-%dT%H:%M:%S') self._session_start_time = dt.timestamp() except: self._session_start_time = time.time() self.prior_session_entries = entries self.prior_disc_entries = disc_entries self.prior_tool_calls = final_tool_calls self.is_viewing_prior_session = True self._trigger_gui_refresh() self.ai_status = f"viewing prior session: {session_dir.name} ({len(entries)} entries)" def cb_exit_prior_session(self): """ [C: src/gui_2.py:App._render_comms_history_panel, src/gui_2.py:App._render_prior_session_view] """ self.is_viewing_prior_session = False if self._current_session_usage: self.session_usage = self._current_session_usage self._current_session_usage = None if self._current_mma_tier_usage: self.mma_tier_usage = self._current_mma_tier_usage self._current_mma_tier_usage = None if self._current_token_history is not None: self._token_history = self._current_token_history self._current_token_history = None if self._current_session_start_time is not None: self._session_start_time = self._current_session_start_time self._current_session_start_time = None self.prior_session_entries.clear() self.prior_disc_entries.clear() self.prior_tool_calls.clear() self._trigger_gui_refresh() self.ai_status = 'idle' def cb_prune_logs(self) -> None: """Manually triggers the log pruning process with aggressive thresholds.""" self.ai_status = "Manual prune started (Age > 0d, Size < 100KB)..." def run_manual_prune() -> None: try: from src import log_registry from src import log_pruner registry = log_registry.LogRegistry(str(paths.get_logs_dir() / "log_registry.toml")) pruner = log_pruner.LogPruner(registry, str(paths.get_logs_dir())) # Aggressive: Prune anything not whitelisted, even if just created, if under 100KB # Note: max_age_days=0 means cutoff is NOW. pruner.prune(max_age_days=0, min_size_kb=100) self.ai_status = "Manual prune complete." except Exception as e: self.ai_status = f"Manual prune error: {e}" print(f"Error during manual log pruning: {e}") thread = threading.Thread(target=run_manual_prune, daemon=True) thread.start() def _load_active_project(self) -> None: """Loads the active project configuration, with fallbacks.""" if self.active_project_path and Path(self.active_project_path).exists(): try: self.project = project_manager.load_project(self.active_project_path) except Exception as e: print(f"Failed to load project {self.active_project_path}: {e}") self.project = project_manager.migrate_from_legacy_config(self.config) self.active_project_path = "" else: self.project = project_manager.migrate_from_legacy_config(self.config) self.active_project_path = "" if not self.active_project_path: for pp in self.project_paths: if Path(pp).exists(): try: self.project = project_manager.load_project(pp) self.active_project_path = pp break except Exception: continue if not self.active_project_path: name = self.project.get("project", {}).get("name", "project") fallback_path = f"{name}.toml" project_manager.save_project(self.project, fallback_path) self.active_project_path = fallback_path if fallback_path not in self.project_paths: self.project_paths.append(fallback_path) self.preset_manager = presets.PresetManager(Path(self.active_project_path).parent if self.active_project_path else None) self.tool_preset_manager = tool_presets.ToolPresetManager(Path(self.active_project_path).parent if self.active_project_path else None) self._refresh_from_project() self._configure_mcp_for_project() def _prune_old_logs(self) -> None: """Asynchronously prunes old insignificant logs on startup.""" def run_prune() -> None: try: from src import log_registry from src import log_pruner registry = log_registry.LogRegistry(str(paths.get_logs_dir() / "log_registry.toml")) pruner = log_pruner.LogPruner(registry, str(paths.get_logs_dir())) pruner.prune() except Exception as e: print(f"Error during log pruning: {e}") thread = threading.Thread(target=run_prune, daemon=True) thread.start() def _fetch_models(self, provider: str) -> None: """ [C: src/gui_2.py:App.run] """ self.ai_status = "fetching models..." def do_fetch() -> None: try: for p in models.PROVIDERS: try: self.all_available_models[p] = ai_client.list_models(p) except Exception as e: self.all_available_models[p] = [] models_list = self.all_available_models.get(provider, []) self.available_models = models_list if self.current_model not in models_list and models_list: self.current_model = models_list[0] ai_client.set_provider(self._current_provider, self.current_model) if self.ai_status == "fetching models...": self.ai_status = f"models loaded: {len(models_list)}" except Exception as e: if self.ai_status == "fetching models...": self.ai_status = f"model fetch error: {e}" self.models_thread = threading.Thread(target=do_fetch, daemon=True) self.models_thread.start() def start_services(self, app: Any = None): """ Starts background threads. [C: src/gui_2.py:App.__init__] """ self._prune_old_logs() self._init_ai_and_hooks(app) self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) self._loop_thread.start() def shutdown(self) -> None: """ Stops background threads and cleans up resources. [C: src/gui_2.py:App.run, src/gui_2.py:App.shutdown, tests/conftest.py:app_instance, tests/conftest.py:mock_app] """ from src import ai_client ai_client.cleanup() if hasattr(self, 'hook_server') and self.hook_server: self.hook_server.stop() self.event_queue.put("shutdown", None) if self._loop_thread and self._loop_thread.is_alive(): self._loop_thread.join(timeout=2.0) def _init_ai_and_hooks(self, app: Any = None) -> None: from src import api_hooks ai_client.set_provider(self._current_provider, self._current_model) if self._current_provider == "gemini_cli": if not ai_client._gemini_cli_adapter: ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path) else: ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path ai_client.confirm_and_run_callback = self._confirm_and_run ai_client.set_comms_log_callback(self._on_comms_entry) ai_client.tool_log_callback = self._on_tool_log mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics self.perf_monitor.alert_callback = self._on_performance_alert ai_client.events.on("request_start", lambda **kw: self._on_api_event("request_start", **kw)) ai_client.events.on("response_received", lambda **kw: self._on_api_event("response_received", **kw)) ai_client.events.on("tool_execution", lambda **kw: self._on_api_event("tool_execution", **kw)) self.hook_server = api_hooks.HookServer(app if app else self) self.hook_server.start() def _run_event_loop(self): """Internal loop runner.""" def queue_fallback() -> None: while True: try: if hasattr(self, '_process_pending_gui_tasks'): self._process_pending_gui_tasks() if hasattr(self, '_process_pending_history_adds'): self._process_pending_history_adds() except: pass time.sleep(0.1) fallback_thread = threading.Thread(target=queue_fallback, daemon=True) fallback_thread.start() self._process_event_queue() def _process_event_queue(self) -> None: """Listens for and processes events from the SyncEventQueue.""" while True: event_name, payload = self.event_queue.get() if event_name == "shutdown": break if event_name == "user_request": threading.Thread(target=self._handle_request_event, args=(payload,), daemon=True).start() elif event_name == "gui_task": with self._pending_gui_tasks_lock: # Directly append the task from the hook server. # It already contains 'action' and any necessary fields. self._pending_gui_tasks.append(payload) elif event_name == "mma_state_update": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "mma_state_update", "payload": payload }) elif event_name == "mma_stream": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "mma_stream_append", "payload": payload }) elif event_name in ("mma_spawn_approval", "mma_step_approval"): with self._pending_gui_tasks_lock: # These payloads already contain the 'action' field self._pending_gui_tasks.append(payload) elif event_name == "response": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": payload }) if self.test_hooks_enabled: with self._api_event_queue_lock: self._api_event_queue.append({"type": "response", "payload": payload}) elif event_name == "ticket_started": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "ticket_started", "payload": payload }) elif event_name == "ticket_completed": with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "ticket_completed", "payload": payload }) elif event_name == "refresh_external_mcps": import asyncio asyncio.run(self.refresh_external_mcps()) def _handle_request_event(self, event: events.UserRequestEvent) -> None: """ Processes a UserRequestEvent by calling the AI client. [C: tests/test_live_gui_integration_v2.py:test_user_request_error_handling, tests/test_live_gui_integration_v2.py:test_user_request_integration_flow, tests/test_rag_integration.py:test_rag_integration] """ self.ai_status = 'sending...' user_msg = event.prompt # 1. RAG Retrieval (Enrich prompt before logging to history) if self.rag_engine and self.rag_config and self.rag_config.enabled: try: chunks = self.rag_engine.search(user_msg) if chunks: context_block = "## Retrieved Context\n\n" for i, chunk in enumerate(chunks): path = chunk.get("metadata", {}).get("path", "unknown") context_block += f"### Chunk {i+1} (Source: {path})\n{chunk.get('document', '')}\n\n" user_msg = context_block + user_msg except Exception as e: sys.stderr.write(f"RAG search error: {e}\n") sys.stderr.flush() # 2. Symbol Resolution (Enrich prompt before logging to history) try: symbols = parse_symbols(user_msg) file_paths = [f['path'] for f in event.file_items] for symbol in symbols: res = get_symbol_definition(symbol, file_paths) if res: file_path, definition, line = res user_msg += f'\n\n[Definition: {symbol} from {file_path} (line {line})]\n```python\n{definition}\n```' except Exception as e: sys.stderr.write(f"Symbol resolution error: {e}\n") sys.stderr.flush() # 3. Log the final enriched prompt to history self.event_queue.put("comms", { "kind": "request", "payload": { "message": user_msg, "collapsed": False } }) ai_client.set_current_tier(None) # Ensure main discussion is untagged # Clear response area for new turn self.ai_response = "" csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) custom_prompt = "\n\n".join(csp) ai_client.set_custom_system_prompt(custom_prompt) ai_client.set_base_system_prompt(self.ui_base_system_prompt) ai_client.set_use_default_base_prompt(self.ui_use_default_base_prompt) ai_client.set_project_context_marker(self.ui_project_context_marker) self.last_resolved_system_prompt = ai_client.get_combined_system_prompt() self.discussion_sent_markdown = event.stable_md self.discussion_sent_system_prompt = self.last_resolved_system_prompt ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit, self.top_p) ai_client.set_agent_tools(self.ui_agent_tools) # Force update adapter path right before send to bypass potential duplication issues self._update_gcli_adapter(self.ui_gemini_cli_path) try: resp = ai_client.send( event.stable_md, user_msg, event.base_dir, event.file_items, event.disc_text, stream=True, stream_callback=lambda text: self._on_ai_stream(text), pre_tool_callback=self._confirm_and_run, qa_callback=ai_client.run_tier4_analysis, patch_callback=ai_client.run_tier4_patch_callback, rag_engine=None # Already handled above ) self.event_queue.put("response", {"text": resp, "status": "done", "role": "AI"}) except ai_client.ProviderError as e: self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}) except Exception as e: self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}) def _offload_entry_payload(self, entry: Dict[str, Any]) -> Dict[str, Any]: optimized = copy.deepcopy(entry) kind = optimized.get("kind") payload = optimized.get("payload", {}) if kind == "tool_result" and "output" in payload: output = payload["output"] ref_path = session_logger.log_tool_output(output) if ref_path: filename = Path(ref_path).name payload["output"] = f"[REF:{filename}]" if kind == "tool_call" and "script" in payload: script = payload["script"] ref_path = session_logger.log_tool_call(script, "LOG_ONLY", None) if ref_path: filename = Path(ref_path).name payload["script"] = f"[REF:{filename}]" return optimized def _on_ai_stream(self, text: str) -> None: """Handles streaming text from the AI.""" self.event_queue.put("response", {"text": text, "status": "streaming...", "role": "AI"}) def _on_comms_entry(self, entry: Dict[str, Any]) -> None: """ [C: tests/test_app_controller_offloading.py:test_on_comms_entry_tool_result_offloading] """ optimized_entry = self._offload_entry_payload(entry) session_logger.log_comms(optimized_entry) entry["local_ts"] = time.time() kind = entry.get("kind") payload = entry.get("payload", {}) if kind == "response" and "usage" in payload: u = payload["usage"] inp = u.get("input_tokens") or u.get("prompt_tokens") or 0 out = u.get("output_tokens") or u.get("completion_tokens") or 0 cache_read = u.get("cache_read_input_tokens") or 0 cache_create = u.get("cache_creation_input_tokens") or 0 total = u.get("total_tokens") or 0 # Store normalized usage back in payload for history rendering u["input_tokens"] = inp u["output_tokens"] = out u["cache_read_input_tokens"] = cache_read self.session_usage["input_tokens"] += inp self.session_usage["output_tokens"] += out self.session_usage["cache_read_input_tokens"] += cache_read self.session_usage["cache_creation_input_tokens"] += cache_create self.session_usage["total_tokens"] += total input_t = u.get("input_tokens") or 0 output_t = u.get("output_tokens") or 0 model = payload.get("model", "unknown") self._token_history.append({ "time": time.time(), "input": input_t, "output": output_t, "model": model }) if kind == "request": if self.ui_auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": "User", "content": payload.get("message", ""), "collapsed": payload.get("collapsed", False), "ts": entry.get("ts", project_manager.now_ts()) }) if kind == "response": if self.ui_auto_add_history: role = payload.get("role", "AI") text_content = payload.get("text", "") if text_content.strip(): segments, parsed_response = thinking_parser.parse_thinking_trace(text_content) entry_obj = { "role": role, "content": parsed_response.strip() if parsed_response else "", "collapsed": True, "ts": entry.get("ts", project_manager.now_ts()) } if segments: entry_obj["thinking_segments"] = [{"content": s.content, "marker": s.marker} for s in segments] if entry_obj["content"] or segments: with self._pending_history_adds_lock: self._pending_history_adds.append(entry_obj) if kind in ("tool_result", "tool_call"): if self.ui_auto_add_history: role = "Tool" if kind == "tool_result" else "Vendor API" content = "" if kind == "tool_result": content = payload.get("output", "") else: content = payload.get("script") or payload.get("args") or payload.get("message", "") if isinstance(content, dict): content = json.dumps(content, indent=1) with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": role, "content": f"[{kind.upper().replace('_', ' ')}]\n{content}", "collapsed": True, "ts": entry.get("ts", project_manager.now_ts()) }) if kind == "history_add": payload = entry.get("payload", {}) with self._pending_history_adds_lock: self._pending_history_adds.append({ "role": payload.get("role", "AI"), "content": payload.get("content", ""), "collapsed": payload.get("collapsed", False), "ts": entry.get("ts", project_manager.now_ts()) }) return with self._pending_comms_lock: self._pending_comms.append(entry) def _on_tool_log(self, script: str, result: str) -> None: """ [C: tests/test_app_controller_offloading.py:test_on_tool_log_offloading] """ session_logger.log_tool_call(script, result, None) session_logger.log_tool_output(result) source_tier = ai_client.get_current_tier() with self._pending_tool_calls_lock: self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) def _on_api_event(self, event_name: str = "generic_event", **kwargs: Any) -> None: """ [C: tests/test_gui_updates.py:test_gui_updates_on_event] """ payload = kwargs.get("payload", {}) # Push to background event queue, NOT GUI queue self.event_queue.put("refresh_api_metrics", payload) if self.test_hooks_enabled: with self._api_event_queue_lock: self._api_event_queue.append({"type": event_name, "payload": payload}) def _on_performance_alert(self, message: str) -> None: self.diagnostic_log.append({ "ts": project_manager.now_ts(), "message": message, "type": "performance" }) def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]: """ [C: tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_mutating_tool_triggers_callback, tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_rejection_prevents_dispatch] """ if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False): self.ai_status = "running powershell..." output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback) self.ai_status = "powershell done, awaiting AI..." return output dialog = ConfirmDialog(script, base_dir) is_headless = "--headless" in sys.argv if is_headless: with self._pending_dialog_lock: self._pending_actions[dialog._uid] = dialog else: with self._pending_dialog_lock: self._pending_dialog = dialog if self.test_hooks_enabled and hasattr(self, '_api_event_queue'): with self._api_event_queue_lock: self._api_event_queue.append({ "type": "script_confirmation_required", "action_id": dialog._uid, "script": str(script), "base_dir": str(base_dir), "ts": time.time() }) approved, final_script = dialog.wait() if is_headless: with self._pending_dialog_lock: if dialog._uid in self._pending_actions: del self._pending_actions[dialog._uid] if not approved: self._append_tool_log(final_script, "REJECTED by user") return None self.ai_status = "running powershell..." output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback, patch_callback=patch_callback) self._append_tool_log(final_script, output) self.ai_status = "powershell done, awaiting AI..." return output def _append_tool_log(self, script: str, result: str, source_tier: str | None = None, elapsed_ms: float = 0.0) -> None: """ [C: tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_has_source_tier, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_dict_keys, tests/test_mma_agent_focus_phase1.py:test_append_tool_log_stores_dict] """ self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) tool_name = self._extract_tool_name(script) is_failure = "REJECTED" in result or "Error" in result or "error" in result.lower() if tool_name: if tool_name not in self._tool_stats: self._tool_stats[tool_name] = {"count": 0, "total_time_ms": 0.0, "failures": 0} self._tool_stats[tool_name]["count"] += 1 self._tool_stats[tool_name]["total_time_ms"] += elapsed_ms if is_failure: self._tool_stats[tool_name]["failures"] += 1 self.ui_last_script_text = script self.ui_last_script_output = result self._trigger_script_blink = True self.show_script_output = True if self.ui_auto_scroll_tool_calls: self._scroll_tool_calls_to_bottom = True def _extract_tool_name(self, script: str) -> str: if not script: return "unknown" script_lower = script.lower() if "powershell" in script_lower or "run_powershell" in script_lower: return "run_powershell" if "read_file" in script_lower: return "read_file" if "write_file" in script_lower or "write" in script_lower: return "write_file" if "list_directory" in script_lower or "ls" in script_lower: return "list_directory" if "search_files" in script_lower or "glob" in script_lower: return "search_files" if "web_search" in script_lower: return "web_search" if "fetch_url" in script_lower: return "fetch_url" if "py_get" in script_lower: return "py_get_skeleton" return "other" def resolve_pending_action(self, action_id: str, approved: bool) -> bool: with self._pending_dialog_lock: if action_id in self._pending_actions: dialog = self._pending_actions[action_id] with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True elif self._pending_dialog and self._pending_dialog._uid == action_id: dialog = self._pending_dialog with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True return False @property def _pending_mma_spawn(self) -> Optional[Dict[str, Any]]: return self._pending_mma_spawns[0] if self._pending_mma_spawns else None @property def _pending_mma_approval(self) -> Optional[Dict[str, Any]]: return self._pending_mma_approvals[0] if self._pending_mma_approvals else None @property def current_provider(self) -> str: return self._current_provider @current_provider.setter def current_provider(self, value: str) -> None: if value != self._current_provider: self._current_provider = value ai_client.reset_session() ai_client.set_provider(value, self.current_model) self.available_models = self.all_available_models.get(value, []) if not self.available_models: self._fetch_models(value) self._token_stats = {} self._token_stats_dirty = True @property def current_model(self) -> str: return self._current_model @current_model.setter def current_model(self, value: str) -> None: if value != self._current_model: self._current_model = value ai_client.reset_session() ai_client.set_provider(self.current_provider, value) self._token_stats = {} self._token_stats_dirty = True def create_api(self) -> FastAPI: """ Creates and configures the FastAPI application for headless mode. [SDM: src/app_controller.py:AppController.create_api] [C: src/gui_2.py:App.run, tests/test_headless_service.py:TestHeadlessAPI.setUp] """ api = FastAPI(title="Manual Slop Headless API") API_KEY_NAME = "X-API-KEY" api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) async def get_api_key(header_key: str = Depends(api_key_header)) -> str: return await _api_get_key(self, header_key) @api.get("/health") def health() -> dict[str, str]: return _api_health(self) @api.get("/api/gui/state", dependencies=[Depends(get_api_key)]) def get_gui_state() -> dict[str, Any]: """ [C: tests/test_ai_settings_layout.py:test_change_provider_via_hook, tests/test_ai_settings_layout.py:test_set_params_via_custom_callback, tests/test_conductor_api_hook_integration.py:simulate_conductor_phase_completion, tests/test_external_editor_gui.py:test_button_click_is_received, tests/test_external_editor_gui.py:test_patch_modal_shows_with_configured_editor, tests/test_external_editor_gui.py:test_vscode_launches_with_diff_view, tests/test_gui_text_viewer.py:test_text_viewer_state_update, tests/test_hooks.py:test_live_hook_server_responses, tests/test_live_gui_integration_v2.py:test_api_gui_state_live, tests/test_live_workflow.py:test_full_live_workflow, tests/test_live_workflow.py:wait_for_value, tests/test_patch_modal_gui.py:test_patch_apply_modal_workflow, tests/test_patch_modal_gui.py:test_patch_modal_appears_on_trigger, tests/test_rag_phase4_final_verify.py:test_phase4_final_verify, tests/test_rag_phase4_stress.py:test_rag_large_codebase_verification_sim, tests/test_saved_presets_sim.py:test_preset_manager_modal, tests/test_saved_presets_sim.py:test_preset_switching, tests/test_task_dag_popout_sim.py:test_task_dag_popout, tests/test_tool_management_layout.py:test_tool_management_gettable_fields, tests/test_tool_management_layout.py:test_tool_management_state_updates, tests/test_tool_presets_sim.py:test_tool_preset_switching, tests/test_usage_analytics_popout_sim.py:test_usage_analytics_popout, tests/test_visual_mma.py:test_visual_mma_components] """ return _api_get_gui_state(self) @api.get("/api/gui/mma_status", dependencies=[Depends(get_api_key)]) def get_mma_status() -> dict[str, Any]: """ [C: tests/test_headless_simulation.py:test_mma_track_lifecycle_simulation, tests/test_live_workflow.py:test_full_live_workflow, tests/test_mma_concurrent_tracks_sim.py:_poll_mma_status, tests/test_mma_concurrent_tracks_sim.py:test_mma_concurrent_tracks_execution, tests/test_mma_concurrent_tracks_stress_sim.py:test_mma_concurrent_tracks_stress, tests/test_mma_step_mode_sim.py:_poll_mma_status, tests/test_mma_step_mode_sim.py:test_mma_step_mode_approval_flow, tests/test_visual_mma.py:test_visual_mma_components, tests/test_visual_orchestration.py:test_mma_epic_lifecycle, tests/test_visual_sim_gui_ux.py:test_gui_ux_event_routing, tests/test_visual_sim_mma_v2.py:_poll] """ return _api_get_mma_status(self) @api.post("/api/gui", dependencies=[Depends(get_api_key)]) def post_gui(req: dict) -> dict[str, str]: """ [C: tests/test_ai_settings_layout.py:test_set_params_via_custom_callback, tests/test_api_hook_client.py:test_post_gui_success, tests/test_gui2_parity.py:test_gui2_custom_callback_hook_works, tests/test_gui2_parity.py:test_gui2_set_value_hook_works] """ return _api_post_gui(self, req) @api.get("/api/session", dependencies=[Depends(get_api_key)]) def get_api_session() -> dict[str, Any]: return _api_get_api_session(self) @api.post("/api/session", dependencies=[Depends(get_api_key)]) def post_api_session(req: dict) -> dict[str, str]: return _api_post_api_session(self, req) @api.get("/api/project", dependencies=[Depends(get_api_key)]) def get_api_project() -> dict[str, Any]: return _api_get_api_project(self) @api.get("/api/performance", dependencies=[Depends(get_api_key)]) def get_performance() -> dict[str, Any]: """ [C: tests/test_gui2_performance.py:test_performance_benchmarking, tests/test_gui_performance_requirements.py:test_idle_performance_requirements, tests/test_gui_stress_performance.py:test_comms_volume_stress_performance, tests/test_selectable_ui.py:test_selectable_label_stability] """ return _api_get_performance(self) @api.get("/api/gui/diagnostics", dependencies=[Depends(get_api_key)]) def get_diagnostics() -> dict[str, Any]: return _api_get_diagnostics(self) @api.get("/status", dependencies=[Depends(get_api_key)]) def status() -> dict[str, Any]: return _api_status(self) @api.post("/api/v1/generate", dependencies=[Depends(get_api_key)]) def generate(req: GenerateRequest) -> dict[str, Any]: return _api_generate(self, req) @api.post("/api/v1/stream", dependencies=[Depends(get_api_key)]) async def stream(req: GenerateRequest) -> Any: return await _api_stream(self, req) @api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)]) def pending_actions() -> list[dict[str, Any]]: return _api_pending_actions(self) @api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)]) def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, str]: return _api_confirm_action(self, action_id, req) @api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)]) def list_sessions() -> list[str]: return _api_list_sessions(self) @api.get("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) def get_session(session_id: str) -> dict[str, Any]: """ [C: simulation/ping_pong.py:main, simulation/sim_context.py:ContextSimulation.run, simulation/sim_execution.py:ExecutionSimulation.run, simulation/sim_tools.py:ToolsSimulation.run, simulation/workflow_sim.py:WorkflowSimulator.run_discussion_turn_async, simulation/workflow_sim.py:WorkflowSimulator.wait_for_ai_response, tests/test_api_hook_client.py:test_get_session_success, tests/test_gui_stress_performance.py:test_comms_volume_stress_performance, tests/test_live_workflow.py:test_full_live_workflow, tests/test_rag_phase4_final_verify.py:test_phase4_final_verify, tests/test_rag_phase4_stress.py:test_rag_large_codebase_verification_sim] """ return _api_get_session(self, session_id) @api.delete("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)]) def delete_session(session_id: str) -> dict[str, str]: return _api_delete_session(self, session_id) @api.get("/api/v1/context", dependencies=[Depends(get_api_key)]) def get_context() -> dict[str, Any]: """ [C: src/fuzzy_anchor.py:FuzzyAnchor.create_slice] """ return _api_get_context(self) @api.get("/api/v1/token_stats", dependencies=[Depends(get_api_key)]) def token_stats() -> dict[str, Any]: return _api_token_stats(self) return api def _cb_new_project_automated(self, user_data: Any) -> None: if user_data: name = Path(user_data).stem proj = project_manager.default_project(name) project_manager.save_project(proj, user_data) if user_data not in self.project_paths: self.project_paths.append(user_data) self._switch_project(user_data) def _cb_project_save(self) -> None: self._flush_to_project() self._flush_to_config() models.save_config(self.config) self.ai_status = "config saved" def _cb_reset_base_prompt(self, user_data=None) -> None: """ [C: src/gui_2.py:App._render_system_prompts_panel] """ self.ui_base_system_prompt = ai_client._SYSTEM_PROMPT self.ui_use_default_base_prompt = False def _cb_clear_summary_cache(self, user_data=None) -> None: """ [C: src/gui_2.py:App._render_files_panel] """ from src import summarize summarize._summary_cache.clear() self._push_mma_state_update() def _handle_clear_summary_cache(self, user_data: Any = None) -> None: self.summary_cache.clear() self.ai_status = 'summary cache cleared' def _cb_show_base_prompt_diff(self, user_data=None) -> None: """ [C: src/gui_2.py:App._render_system_prompts_panel] """ self._show_base_prompt_diff_modal = True def _cb_disc_create(self) -> None: nm = self.ui_disc_new_name_input.strip() if nm: self._create_discussion(nm) self.ui_disc_new_name_input = "" def _configure_mcp_for_project(self) -> None: if not self.active_project_path: return project_root = Path(self.active_project_path).parent file_items_as_dicts = [{"path": f.path if hasattr(f, "path") else str(f)} for f in self.files] mcp_client.configure(file_items_as_dicts, [str(project_root)]) def _switch_project(self, path: str) -> None: """ [C: src/gui_2.py:App._render_projects_panel] """ if path == self.active_project_path: return if not Path(path).exists(): self.ai_status = f"project file not found: {path}" return self._flush_to_project() try: self.project = project_manager.load_project(path) self.active_project_path = path new_root = Path(path).parent self.preset_manager = presets.PresetManager(new_root) self.tool_preset_manager = tool_presets.ToolPresetManager(new_root) from src.personas import PersonaManager self.persona_manager = PersonaManager(new_root) except Exception as e: self.ai_status = f"failed to load project: {e}" return self._refresh_from_project() file_items_as_dicts = [{"path": f.path if hasattr(f, "path") else str(f)} for f in self.files] mcp_client.configure(file_items_as_dicts, [str(new_root)]) self.ai_status = f"switched to: {Path(path).stem}" def _refresh_from_project(self) -> None: # Deserialize FileItems in files.paths """ [C: tests/test_mma_dashboard_refresh.py:test_mma_dashboard_initialization_refresh, tests/test_mma_dashboard_refresh.py:test_mma_dashboard_refresh, tests/test_view_presets.py:test_load_presets_from_project_legacy_dict, tests/test_view_presets.py:test_load_presets_from_project_list] """ raw_paths = self.project.get("files", {}).get("paths", []) self.files = [] for p in raw_paths: if isinstance(p, models.FileItem): self.files.append(p) elif isinstance(p, dict): self.files.append(models.FileItem.from_dict(p)) else: self.files.append(models.FileItem(path=str(p))) self.screenshots = list(self.project.get("screenshots", {}).get("paths", [])) disc_sec = self.project.get("discussion", {}) self.disc_roles = list(disc_sec.get("roles", ["User", "AI", "Vendor API", "System"])) self.active_discussion = disc_sec.get("active", "main") disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {}) with self._disc_entries_lock: self.disc_entries[:] = models.parse_history_entries(disc_data.get("history", []), self.disc_roles) proj = self.project self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen") self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".") self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".") proj_meta = self.project.get("project", {}) self.ui_project_git_dir = proj_meta.get("git_dir", "") self.ui_project_system_prompt = proj_meta.get("system_prompt", "") self.ui_project_preset_name = proj_meta.get("active_preset") self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini") self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False) self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True) self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True) self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True) agent_tools_cfg = proj.get("agent", {}).get("tools", {}) self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in models.AGENT_TOOL_NAMES} # MMA Tracks self.tracks = project_manager.get_all_tracks(self.active_project_root) # Restore MMA state mma_sec = proj.get("mma", {}) self.ui_epic_input = mma_sec.get("epic", "") tier_models = mma_sec.get("tier_models", {}) for tier, data in tier_models.items(): if tier in self.mma_tier_usage: self.mma_tier_usage[tier]["model"] = data.get("model", self.mma_tier_usage[tier]["model"]) self.mma_tier_usage[tier]["provider"] = data.get("provider", self.mma_tier_usage[tier]["provider"]) self.mma_tier_usage[tier]["tool_preset"] = data.get("tool_preset", self.mma_tier_usage[tier].get("tool_preset")) at_data = mma_sec.get("active_track") if at_data: try: tickets = [] for t_data in at_data.get("tickets", []): tickets.append(models.Ticket(**t_data)) self.active_track = models.Track( id=at_data.get("id"), description=at_data.get("description"), tickets=tickets ) self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table except Exception as e: print(f"Failed to deserialize active track: {e}") self.active_track = None else: self.active_track = None self.active_tickets = [] # Load track-scoped history if track is active if self.active_track: track_history = project_manager.load_track_history(self.active_track.id, self.active_project_root) if track_history: with self._disc_entries_lock: self.disc_entries[:] = models.parse_history_entries(track_history, self.disc_roles) self.preset_manager.project_root = Path(self.active_project_root) self.presets = self.preset_manager.load_all() self.tool_preset_manager.project_root = Path(self.active_project_root) self.tool_presets = self.tool_preset_manager.load_all_presets() self.bias_profiles = self.tool_preset_manager.load_all_bias_profiles() raw_presets = proj.get("view_presets", []) if isinstance(raw_presets, dict): self.view_presets = [models.NamedViewPreset.from_dict({"name": name, **data}) for name, data in raw_presets.items()] else: self.view_presets = [models.NamedViewPreset.from_dict(p) for p in raw_presets if isinstance(p, dict)] if self.rag_config and self.rag_config.enabled: self._rebuild_rag_index() def _cb_save_workspace_profile(self, name: str, scope: str = 'project') -> None: """ [C: src/gui_2.py:App._render_save_workspace_profile_modal] """ if not hasattr(self, '_app') or not self._app: return profile = self._app._capture_workspace_profile(name) self.workspace_manager.save_profile(profile, scope=scope) self.workspace_profiles = self.workspace_manager.load_all_profiles() self._app.workspace_profiles = self.workspace_profiles def _cb_delete_workspace_profile(self, name: str, scope: str = 'project') -> None: """ [C: src/gui_2.py:App._show_menus] """ self.workspace_manager.delete_profile(name, scope=scope) self.workspace_profiles = self.workspace_manager.load_all_profiles() if hasattr(self, '_app') and self._app: self._app.workspace_profiles = self.workspace_profiles def _cb_load_workspace_profile(self, name: str) -> None: """ [C: src/gui_2.py:App._show_menus] """ if name in self.workspace_profiles: profile = self.workspace_profiles[name] if hasattr(self, '_app') and self._app: self._app._apply_workspace_profile(profile) def _apply_preset(self, name: str, scope: str) -> None: """ [C: src/gui_2.py:App._render_system_prompts_panel] """ if name == "None": if scope == "global": self.ui_global_preset_name = "" else: self.ui_project_preset_name = "" return preset = self.presets.get(name) if not preset: return if scope == "global": self.ui_global_system_prompt = preset.system_prompt self.ui_global_preset_name = name else: self.ui_project_system_prompt = preset.system_prompt self.ui_project_preset_name = name def _cb_save_preset(self, name, content, scope): """ [C: src/gui_2.py:App._render_preset_manager_content] """ if not name or not name.strip(): raise ValueError("Preset name cannot be empty or whitespace.") preset = models.Preset( name=name, system_prompt=content ) self.preset_manager.save_preset(preset, scope) self.presets = self.preset_manager.load_all() def _cb_delete_preset(self, name, scope): """ [C: src/gui_2.py:App._render_preset_manager_content] """ self.preset_manager.delete_preset(name, scope) self.presets = self.preset_manager.load_all() def _cb_save_tool_preset(self, name, categories, scope): """ [C: src/gui_2.py:App._render_tool_preset_manager_content] """ preset = models.ToolPreset(name=name, categories=categories) self.tool_preset_manager.save_preset(preset, scope) self.tool_presets = self.tool_preset_manager.load_all_presets() def _cb_delete_tool_preset(self, name, scope): """ [C: src/gui_2.py:App._render_tool_preset_manager_content] """ self.tool_preset_manager.delete_preset(name, scope) self.tool_presets = self.tool_preset_manager.load_all_presets() def _cb_save_bias_profile(self, profile: models.BiasProfile, scope: str = "project"): """ [C: src/gui_2.py:App._render_tool_preset_manager_content] """ self.tool_preset_manager.save_bias_profile(profile, scope) self.bias_profiles = self.tool_preset_manager.load_all_bias_profiles() def _cb_delete_bias_profile(self, name: str, scope: str = "project"): self.tool_preset_manager.delete_bias_profile(name, scope) self.bias_profiles = self.tool_preset_manager.load_all_bias_profiles() def _cb_save_persona(self, persona: models.Persona, scope: str = "project") -> None: """ [C: src/gui_2.py:App._render_persona_editor_window] """ self.persona_manager.save_persona(persona, scope) self.personas = self.persona_manager.load_all() def _cb_delete_persona(self, name: str, scope: str = "project") -> None: """ [C: src/gui_2.py:App._render_persona_editor_window] """ self.persona_manager.delete_persona(name, scope) self.personas = self.persona_manager.load_all() def _cb_save_view_preset(self, name: str, f_item: models.FileItem) -> None: """ [C: src/gui_2.py:App._render_context_files_table, tests/test_view_presets.py:test_save_view_preset] """ preset = models.NamedViewPreset( name=name, view_mode=f_item.view_mode, ast_mask=copy.deepcopy(f_item.ast_mask) if hasattr(f_item, "ast_mask") else {}, custom_slices=copy.deepcopy(f_item.custom_slices) if hasattr(f_item, "custom_slices") else [] ) for i, vp in enumerate(self.view_presets): if vp.name == name: self.view_presets[i] = preset break else: self.view_presets.append(preset) self._flush_to_project() def _cb_apply_view_preset(self, name: str, f_item: models.FileItem) -> None: """ [C: src/gui_2.py:App._render_context_files_table, tests/test_view_presets.py:test_apply_view_preset] """ preset = next((vp for vp in self.view_presets if vp.name == name), None) if preset: f_item.view_mode = preset.view_mode f_item.ast_mask = copy.deepcopy(preset.ast_mask) f_item.custom_slices = copy.deepcopy(preset.custom_slices) def _cb_delete_view_preset(self, name: str) -> None: """ [C: tests/test_view_presets.py:test_delete_view_preset] """ self.view_presets = [vp for vp in self.view_presets if vp.name != name] self._flush_to_project() def save_context_preset(self, preset: models.ContextPreset) -> None: self.context_preset_manager.save_preset(self.project, preset) self._save_active_project() def load_context_preset(self, name: str) -> models.ContextPreset: presets = self.context_preset_manager.load_all(self.project) if name not in presets: raise KeyError(f"Context preset '{name}' not found.") preset = presets[name] # Update only temporary context state, not project files import copy self.context_files = [] for f in preset.files: fi = models.FileItem(path=f.path, view_mode=f.view_mode) fi.custom_slices = copy.deepcopy(f.custom_slices) if hasattr(f, 'custom_slices') else [] fi.ast_mask = copy.deepcopy(f.ast_mask) if hasattr(f, 'ast_mask') else {} fi.ast_signatures = getattr(f, 'ast_signatures', False) fi.ast_definitions = getattr(f, 'ast_definitions', False) self.context_files.append(fi) self.screenshots = list(preset.screenshots) return preset def _cb_load_track(self, track_id: str) -> None: """ [C: src/gui_2.py:App._render_mma_track_browser] """ state = project_manager.load_track_state(track_id, self.active_project_root) if state: try: # Convert list[Ticket] or list[dict] to list[Ticket] for Track object tickets = [] for t in state.tasks: if isinstance(t, dict): tickets.append(models.Ticket(**t)) else: tickets.append(t) self.active_track = models.Track( id=state.metadata.id, description=state.metadata.name, tickets=tickets ) # Keep dicts for UI table self._load_active_tickets() # Load track-scoped history history = project_manager.load_track_history(track_id, self.active_project_root) with self._disc_entries_lock: if history: self.disc_entries[:] = models.parse_history_entries(history, self.disc_roles) else: self.disc_entries.clear() self._recalculate_session_usage() self.ai_status = f"Loaded track: {state.metadata.name}" except Exception as e: self.ai_status = f"Load track error: {e}" print(f"Error loading track {track_id}: {e}") def _save_active_project(self) -> None: """ [C: src/gui_2.py:App.delete_context_preset, src/gui_2.py:App.save_context_preset] """ if self.active_project_path: try: cleaned = project_manager.clean_nones(self.project) project_manager.save_project(cleaned, self.active_project_path) except Exception as e: self.ai_status = f"save error: {e}" def _get_discussion_names(self) -> list[str]: """ [C: src/gui_2.py:App._render_discussion_selector, src/gui_2.py:App._render_theme_panel] """ disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) return sorted(discussions.keys()) def _switch_discussion(self, name: str) -> None: """ [C: src/gui_2.py:App._render_discussion_selector, src/gui_2.py:App._render_takes_panel, src/gui_2.py:App._render_theme_panel] """ self._flush_disc_entries_to_project() disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if name not in discussions: self.ai_status = f"discussion not found: {name}" return self.active_discussion = name self._force_tab_selection = True self._track_discussion_active = False disc_sec["active"] = name disc_data = discussions[name] with self._disc_entries_lock: self.disc_entries[:] = models.parse_history_entries(disc_data.get("history", []), self.disc_roles) self.discussion_sent_markdown = disc_data.get("sent_markdown", "") self.discussion_sent_system_prompt = disc_data.get("sent_system_prompt", "") if "context_snapshot" in disc_data: snapshot_data = disc_data["context_snapshot"] self.context_files = [models.FileItem.from_dict(f) if isinstance(f, dict) else models.FileItem(path=str(f)) for f in snapshot_data] if self._app: self._app.ui_selected_context_files = {f.path for f in self.context_files if f.auto_aggregate} self.ai_status = f"discussion: {name}" def _flush_disc_entries_to_project(self) -> None: """ [C: src/gui_2.py:App._render_discussion_selector, src/gui_2.py:App._render_theme_panel] """ history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries] if self.active_track and self._track_discussion_active: project_manager.save_track_history(self.active_track.id, history_strings, self.active_project_root) return disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion()) disc_data["history"] = history_strings disc_data["last_updated"] = project_manager.now_ts() disc_data["context_snapshot"] = [f.to_dict() if hasattr(f, "to_dict") else {"path": str(f)} for f in self.context_files] disc_data["sent_markdown"] = getattr(self, "discussion_sent_markdown", "") disc_data["sent_system_prompt"] = getattr(self, "discussion_sent_system_prompt", "") def _create_discussion(self, name: str) -> None: """ [C: src/gui_2.py:App._render_discussion_metadata, src/gui_2.py:App._render_synthesis_panel, src/gui_2.py:App._render_takes_panel] """ disc_sec = self.project.setdefault("discussion", {}) discussions = disc_sec.setdefault("discussions", {}) if name in discussions: self.ai_status = f"discussion '{name}' already exists" return new_disc = project_manager.default_discussion() # Inherit context from current session if available if self.context_files: new_disc["context_snapshot"] = [f.to_dict() if hasattr(f, 'to_dict') else f for f in self.context_files] discussions[name] = new_disc self._switch_discussion(name) def _branch_discussion(self, index: int) -> None: """ [C: src/gui_2.py:App._render_discussion_entry] """ self._flush_disc_entries_to_project() # Generate a unique branch name base_name = self.active_discussion.split("_take_")[0] counter = 1 new_name = f"{base_name}_take_{counter}" disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) while new_name in discussions: counter += 1 new_name = f"{base_name}_take_{counter}" project_manager.branch_discussion(self.project, self.active_discussion, new_name, index) self._switch_discussion(new_name) def _rename_discussion(self, old_name: str, new_name: str) -> None: """ [C: src/gui_2.py:App._render_discussion_metadata] """ disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if old_name not in discussions: return if new_name in discussions: self.ai_status = f"discussion '{new_name}' already exists" return discussions[new_name] = discussions.pop(old_name) if self.active_discussion == old_name: self.active_discussion = new_name disc_sec["active"] = new_name def _delete_discussion(self, name: str) -> None: """ [C: src/gui_2.py:App._render_discussion_metadata] """ disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) if len(discussions) <= 1: self.ai_status = "cannot delete the last discussion" return if name not in discussions: return del discussions[name] if self.active_discussion == name: remaining = sorted(discussions.keys()) self._switch_discussion(remaining[0]) def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None: """ [C: src/gui_2.py:App._handle_approve_mma_step, src/gui_2.py:App._handle_approve_spawn, src/gui_2.py:App._render_mma_modals] """ if self._pending_mma_approvals: task = self._pending_mma_approvals.pop(0) dlg = task.get("dialog_container", [None])[0] if dlg: with dlg._condition: dlg._approved = approved if payload is not None: dlg._payload = payload dlg._done = True dlg._condition.notify_all() elif self._pending_mma_spawns: task = self._pending_mma_spawns.pop(0) spawn_dlg = task.get("dialog_container", [None])[0] if spawn_dlg: with spawn_dlg._condition: spawn_dlg._approved = approved spawn_dlg._abort = abort if prompt is not None: spawn_dlg._prompt = prompt if context_md is not None: spawn_dlg._context_md = context_md spawn_dlg._done = True spawn_dlg._condition.notify_all() def _handle_approve_ask(self) -> None: """ Responds with approval for a pending /api/ask request. [C: src/gui_2.py:App._handle_approve_ask, src/gui_2.py:App._render_mma_modals] """ if not self._ask_request_id: return request_id = self._ask_request_id def do_post() -> None: try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": True}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reject_ask(self) -> None: """ Responds with rejection for a pending /api/ask request. [C: src/gui_2.py:App._render_mma_modals] """ if not self._ask_request_id: return request_id = self._ask_request_id def do_post() -> None: try: requests.post( "http://127.0.0.1:8999/api/ask/respond", json={"request_id": request_id, "response": {"approved": False}}, timeout=2 ) except Exception as e: print(f"Error responding to ask: {e}") threading.Thread(target=do_post, daemon=True).start() self._pending_ask_dialog = False self._ask_request_id = None self._ask_tool_data = None def _handle_reset_session(self) -> None: """ Logic for resetting the AI session and GUI state. [C: src/gui_2.py:App._render_message_panel] """ ai_client.reset_session() ai_client.clear_comms_log() self._tool_log.clear() self._tool_stats.clear() self._comms_log.clear() self.disc_entries.clear() self.discussion_sent_markdown = "" self.discussion_sent_system_prompt = "" self.files.clear() self.context_files.clear() self.tracks.clear() # Clear history in ALL discussions to be safe disc_sec = self.project.get("discussion", {}) discussions = disc_sec.get("discussions", {}) for d_name in discussions: discussions[d_name]["history"] = [] self.ai_status = "session reset" self.ai_response = "" self.ui_ai_input = "" self.ui_manual_approve = False self.ui_auto_add_history = False self.active_track = None self.active_tier = None self.mma_status = 'idle' self.proposed_tracks = [] self.active_tickets = [] self.engines.clear() self.mma_streams.clear() self._worker_status.clear() self._current_provider = "gemini" self._current_model = "gemini-2.5-flash-lite" ai_client.set_provider(self._current_provider, self._current_model) with self._pending_history_adds_lock: self._pending_history_adds.clear() with self._api_event_queue_lock: self._api_event_queue.clear() with self._pending_gui_tasks_lock: self._pending_gui_tasks.clear() self.ui_use_default_base_prompt = True self.ui_global_system_prompt = '' self.ui_base_system_prompt = '' self.ui_project_system_prompt = '' self.ui_active_persona = '' self.ui_active_tool_preset = None self.ui_active_bias_profile = None self.temperature = 0.0 self.top_p = 1.0 self.max_tokens = 8192 def _handle_md_only(self) -> None: """ Logic for the 'MD Only' action. [C: src/gui_2.py:App._render_message_panel] """ def worker(): """ [C: tests/test_symbol_parsing.py:test_handle_generate_send_appends_definitions, tests/test_symbol_parsing.py:test_handle_generate_send_no_symbols] """ try: md, path, *_ = self._do_generate() self.last_md = md self.last_md_path = path self.ai_status = f"md written: {path.name}" # Refresh token budget metrics with CURRENT md self._refresh_api_metrics({}, md_content=md) except Exception as e: self.ai_status = f"error: {e}" threading.Thread(target=worker, daemon=True).start() def _handle_generate_send(self) -> None: """ Logic for the 'Gen + Send' action. [C: src/gui_2.py:App._render_message_panel, src/gui_2.py:App._render_synthesis_panel, src/gui_2.py:App._render_takes_panel, tests/test_gui_events_v2.py:test_handle_generate_send_pushes_event, tests/test_symbol_parsing.py:test_handle_generate_send_appends_definitions, tests/test_symbol_parsing.py:test_handle_generate_send_no_symbols] """ def worker(): """ [C: tests/test_symbol_parsing.py:test_handle_generate_send_appends_definitions, tests/test_symbol_parsing.py:test_handle_generate_send_no_symbols] """ try: md, path, file_items, stable_md, disc_text = self._do_generate() self._last_stable_md = stable_md self.last_md = md self.last_md_path = path self.last_file_items = file_items self.ai_status = "sending..." user_msg = self.ui_ai_input base_dir = self.active_project_root # Prepare event payload event_payload = events.UserRequestEvent( prompt=user_msg, stable_md=stable_md, file_items=file_items, disc_text=disc_text, base_dir=base_dir ) # Push to async queue self.event_queue.put("user_request", event_payload) except Exception as e: self.ai_status = f"generate error: {e}" threading.Thread(target=worker, daemon=True).start() def _recalculate_session_usage(self) -> None: usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0, "percentage": self.session_usage.get("percentage", 0.0)} for entry in ai_client.get_comms_log(): if entry.get("kind") == "response" and "usage" in entry.get("payload", {}): u = entry["payload"]["usage"] for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]: if k in usage: usage[k] += u.get(k, 0) or 0 self.session_usage = usage # Update cached files list stats = ai_client.get_gemini_cache_stats() self._cached_files = stats.get("cached_files", []) def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None: """ [C: tests/test_gui_updates.py:test_telemetry_data_updates_correctly] """ if "latency" in payload: self.session_usage["last_latency"] = payload["latency"] if "usage" in payload and "percentage" in payload["usage"]: self.session_usage["percentage"] = payload["usage"]["percentage"] self._recalculate_session_usage() if md_content is not None: stats = ai_client.get_token_stats(md_content) # Ensure compatibility if keys are named differently if "total_tokens" in stats and "estimated_prompt_tokens" not in stats: stats["estimated_prompt_tokens"] = stats["total_tokens"] self._token_stats = stats cache_stats = payload.get("cache_stats") if cache_stats: count = cache_stats.get("cache_count", 0) size_bytes = cache_stats.get("total_size_bytes", 0) self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)" self._update_cached_stats() def _update_cached_stats(self) -> None: from src import ai_client self._cached_cache_stats = ai_client.get_gemini_cache_stats() self._cached_tool_stats = dict(self._tool_stats) def clear_cache(self) -> None: """ [C: src/gui_2.py:App._render_cache_panel] """ from src import ai_client ai_client.cleanup() self._update_cached_stats() def get_session_insights(self) -> Dict[str, Any]: """ [C: src/gui_2.py:App._render_session_insights_panel] """ from src import cost_tracker total_input = sum(e["input"] for e in self._token_history) total_output = sum(e["output"] for e in self._token_history) total_tokens = total_input + total_output elapsed_min = (time.time() - self._session_start_time) / 60.0 if self._token_history else 0 burn_rate = total_tokens / elapsed_min if elapsed_min > 0 else 0 session_cost = cost_tracker.estimate_cost("gemini-2.5-flash", total_input, total_output) completed = sum(1 for t in self.active_tickets if t.get("status") == "complete") efficiency = total_tokens / completed if completed > 0 else 0 return { "total_tokens": total_tokens, "total_input": total_input, "total_output": total_output, "elapsed_min": elapsed_min, "burn_rate": burn_rate, "session_cost": session_cost, "completed_tickets": completed, "efficiency": efficiency, "call_count": len(self._token_history) } def _flush_to_project(self) -> None: """ [C: src/gui_2.py:App._render_discussion_entry_controls, src/gui_2.py:App._render_main_interface, src/gui_2.py:App._render_projects_panel, src/gui_2.py:App._show_menus, tests/test_view_presets.py:test_save_view_preset] """ proj = self.project proj.setdefault("output", {})["output_dir"] = self.ui_output_dir proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir proj["files"]["paths"] = self.files proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir proj["screenshots"]["paths"] = self.screenshots proj.setdefault("project", {}) proj["project"]["git_dir"] = self.ui_project_git_dir proj.setdefault("conductor", {})["dir"] = self.ui_project_conductor_dir proj["project"]["system_prompt"] = self.ui_project_system_prompt proj["project"]["active_preset"] = self.ui_project_preset_name proj["project"]["word_wrap"] = self.ui_word_wrap proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path proj.setdefault("agent", {}).setdefault("tools", {}) for t_name in models.AGENT_TOOL_NAMES: proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True) self._flush_disc_entries_to_project() disc_sec = proj.setdefault("discussion", {}) disc_sec["roles"] = self.disc_roles disc_sec["active"] = self.active_discussion disc_sec["auto_add"] = self.ui_auto_add_history proj["view_presets"] = [vp.to_dict() for vp in self.view_presets] # Save MMA State mma_sec = proj.setdefault("mma", {}) mma_sec["epic"] = self.ui_epic_input mma_sec["tier_models"] = {t: {"model": d["model"], "provider": d.get("provider", "gemini"), "tool_preset": d.get("tool_preset")} for t, d in self.mma_tier_usage.items()} if self.active_track: mma_sec["active_track"] = asdict(self.active_track) else: mma_sec["active_track"] = None cleaned_proj = project_manager.clean_nones(proj) project_manager.save_project(cleaned_proj, self.active_project_path) def _flush_to_config(self) -> None: """ [C: src/gui_2.py:App._render_discussion_entry_controls, src/gui_2.py:App._render_main_interface, src/gui_2.py:App._render_projects_panel, src/gui_2.py:App._render_theme_panel, src/gui_2.py:App._show_menus, tests/test_system_prompt_exposure.py:TestSystemPromptExposure.test_app_controller_flush_saves_prompts] """ self.config["ai"] = { "provider": self.current_provider, "model": self.current_model, "temperature": self.temperature, "top_p": self.top_p, "max_tokens": self.max_tokens, "history_trunc_limit": self.history_trunc_limit, "active_preset": self.ui_global_preset_name, } self.config["ai"]["system_prompt"] = self.ui_global_system_prompt self.config["ai"]["base_system_prompt"] = self.ui_base_system_prompt self.config["ai"]["use_default_base_prompt"] = self.ui_use_default_base_prompt if self.rag_config: self.config["rag"] = self.rag_config.to_dict() self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path} from src import bg_shader # Update gui section while preserving other keys like bg_shader_enabled gui_cfg = self.config.get("gui", {}) gui_cfg.update({ "show_windows": self.show_windows, "separate_message_panel": getattr(self, "ui_separate_message_panel", False), "separate_response_panel": getattr(self, "ui_separate_response_panel", False), "separate_tool_calls_panel": getattr(self, "ui_separate_tool_calls_panel", False), "separate_external_tools": getattr(self, "ui_separate_external_tools", False), "separate_task_dag": self.ui_separate_task_dag, "separate_usage_analytics": self.ui_separate_usage_analytics, "separate_tier1": self.ui_separate_tier1, "separate_tier2": self.ui_separate_tier2, "separate_tier3": self.ui_separate_tier3, "separate_tier4": self.ui_separate_tier4, "bg_shader_enabled": bg_shader.get_bg().enabled }) self.config["gui"] = gui_cfg # Explicitly save theme state into the config dict theme.save_to_config(self.config) def _do_generate(self) -> tuple[str, Path, list[dict[str, Any]], str, str]: """ Returns (full_md, output_path, file_items, stable_md, discussion_text). [C: src/gui_2.py:App._show_menus, tests/test_context_composition_decoupled.py:test_do_generate_uses_context_files, tests/test_tiered_aggregation.py:test_app_controller_do_generate_uses_persona_strategy] """ self._flush_to_project() self._flush_to_config() models.save_config(self.config) track_id = self.active_track.id if self.active_track else None flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id) import copy flat["files"] = copy.copy(flat.get("files", {})) flat["files"]["paths"] = self.context_files import os file_dicts = [] for f in self.context_files: p = f.path if hasattr(f, 'path') else str(f) if not os.path.isabs(p): p = os.path.join(self.ui_files_base_dir, p) file_dicts.append({"path": p}) mcp_client.configure(file_dicts, [self.ui_files_base_dir]) persona = self.personas.get(self.ui_active_persona) strategy = persona.aggregation_strategy if persona else "auto" full_md, path, file_items = aggregate.run(flat, aggregation_strategy=strategy) # Build stable markdown (no history) for Gemini caching screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", ".")) screenshots = flat.get("screenshots", {}).get("paths", []) summary_only = flat.get("project", {}).get("summary_only", False) stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only, aggregation_strategy=strategy) # Build discussion history text separately history = flat.get("discussion", {}).get("history", []) discussion_text = aggregate.build_discussion_text(history) csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()]) ai_client.set_custom_system_prompt("\n\n".join(csp)) ai_client.set_base_system_prompt(self.ui_base_system_prompt) ai_client.set_use_default_base_prompt(self.ui_use_default_base_prompt) ai_client.set_project_context_marker(self.ui_project_context_marker) self.last_resolved_system_prompt = ai_client.get_combined_system_prompt() self.last_aggregate_markdown = full_md return full_md, path, file_items, stable_md, discussion_text def _cb_plan_epic(self) -> None: """ [C: src/gui_2.py:App._render_mma_epic_planner, tests/test_mma_orchestration_gui.py:test_cb_plan_epic_launches_thread] """ def _bg_task() -> None: try: self.ai_status = "Planning Epic (Tier 1)..." history = orchestrator_pm.get_track_history_summary() proj = project_manager.load_project(self.active_project_path) flat = project_manager.flat_config(self.project) flat.setdefault("files", {})["paths"] = self.context_files file_items = aggregate.build_file_items(Path(self.active_project_root), flat.get("files", {}).get("paths", [])) _t1_baseline = len(ai_client.get_comms_log()) tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history) _t1_new = ai_client.get_comms_log()[_t1_baseline:] _t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"] _t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp) _t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp) def _push_t1_usage(i: int, o: int) -> None: self.mma_tier_usage["Tier 1"]["input"] += i self.mma_tier_usage["Tier 1"]["output"] += o with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "custom_callback", "callback": _push_t1_usage, "args": [_t1_in, _t1_out] }) self._pending_gui_tasks.append({ "action": "handle_ai_response", "payload": { "text": json.dumps(tracks, indent=2), "stream_id": "Tier 1", "status": "Epic tracks generated." } }) self._pending_gui_tasks.append({ "action": "show_track_proposal", "payload": tracks }) except Exception as e: self.ai_status = f"Epic plan error: {e}" print(f"ERROR in _cb_plan_epic background task: {e}") threading.Thread(target=_bg_task, daemon=True).start() def _cb_accept_tracks(self) -> None: """ [C: src/gui_2.py:App._render_track_proposal_modal] """ self._show_track_proposal_modal = False def _bg_task() -> None: # Generate skeletons once self.ai_status = "Phase 2: Generating skeletons for all tracks..." parser = ASTParser(language="python") generated_skeletons = "" try: # Use a local copy of files to avoid concurrent modification issues files_to_scan = list(self.files) for i, f_item in enumerate(files_to_scan): try: self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(files_to_scan)})..." f_path = f_item.path if hasattr(f_item, 'path') else str(f_item) abs_path = Path(self.active_project_root) / f_path if abs_path.exists() and abs_path.suffix == ".py": with open(abs_path, "r", encoding="utf-8") as f: code = f.read() generated_skeletons += f"\nFile: {f_path}\n{parser.get_skeleton(code)}\n" except Exception as e: pass except Exception as e: self.ai_status = f"Error generating skeletons: {e}" return # Exit if skeleton generation fails # Now loop through tracks and call _start_track_logic with generated skeletons total_tracks = len(self.proposed_tracks) print(f"[DEBUG] _cb_accept_tracks: Starting {total_tracks} tracks...") for i, track_data in enumerate(self.proposed_tracks): title = track_data.get("title") or track_data.get("goal", "Untitled Track") self.ai_status = f"Processing track {i+1} of {total_tracks}: '{title}'..." self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons print(f"[DEBUG] _cb_accept_tracks: All {total_tracks} tracks processed.") with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started self.ai_status = f"All {total_tracks} tracks accepted and execution started." threading.Thread(target=_bg_task, daemon=True).start() def _cb_start_track(self, user_data: Any = None) -> None: """ [C: src/gui_2.py:App._render_track_proposal_modal] """ if isinstance(user_data, str): # If track_id is provided directly track_id = user_data # Ensure it's loaded as active if not self.active_track or self.active_track.id != track_id: self._cb_load_track(track_id) if self.active_track and self.active_track.id == track_id: # Use the active track object directly to start execution self.mma_status = "running" engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode) self.engines[self.active_track.id] = engine flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id) full_md, _, _ = aggregate.run(flat) threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start() self.ai_status = f"Track '{self.active_track.description}' started." elif self.active_track and self.active_track.id != track_id: # load_track failed but active_track is still wrong - reload explicitly self._cb_load_track(track_id) if self.active_track and self.active_track.id == track_id: self.mma_status = "running" engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode) self.engines[self.active_track.id] = engine flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id) full_md, _, _ = aggregate.run(flat) threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start() self.ai_status = f"Track '{self.active_track.description}' started." return idx = 0 if isinstance(user_data, int): idx = user_data elif isinstance(user_data, dict): idx = user_data.get("index", 0) if 0 <= idx < len(self.proposed_tracks): track_data = self.proposed_tracks[idx] title = track_data.get("title") or track_data.get("goal", "Untitled Track") threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start() self.ai_status = f"Track '{title}' started." def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None: try: goal = track_data.get("goal", "") title = track_data.get("title") or track_data.get("goal", "Untitled Track") self.ai_status = f"Phase 2: Generating tickets for {title}..." skeletons = skeletons_str or "" # Use provided skeletons or empty self.ai_status = "Phase 2: Calling Tech Lead..." _t2_baseline = len(ai_client.get_comms_log()) raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons) _t2_new = ai_client.get_comms_log()[_t2_baseline:] _t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"] _t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp) _t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp) def _push_t2_usage(i: int, o: int) -> None: self.mma_tier_usage["Tier 2"]["input"] += i self.mma_tier_usage["Tier 2"]["output"] += o with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({ "action": "custom_callback", "callback": _push_t2_usage, "args": [_t2_in, _t2_out] }) if not raw_tickets: self.ai_status = f"Error: No tickets generated for track: {title}" print(f"Warning: No tickets generated for track: {title}") return self.ai_status = "Phase 2: Sorting tickets..." try: sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets) except ValueError as e: print(f"Dependency error in track '{title}': {e}") sorted_tickets_data = raw_tickets # 3. Create Track and Ticket objects tickets = [] for t_data in sorted_tickets_data: ticket = models.Ticket( id=t_data["id"], description=t_data.get("description") or t_data.get("goal", "No description"), status=t_data.get("status", "todo"), assigned_to=t_data.get("assigned_to", "unassigned"), depends_on=t_data.get("depends_on", []), step_mode=t_data.get("step_mode", False) ) tickets.append(ticket) track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}" track = models.Track(id=track_id, description=title, tickets=tickets) # Initialize track state in the filesystem meta = models.Metadata(id=track_id, name=title, status="todo", created_at=datetime.now(), updated_at=datetime.now()) state = models.TrackState(metadata=meta, discussion=[], tasks=tickets) project_manager.save_track_state(track_id, state, self.active_project_root) # Add to memory and notify UI self.tracks.append({"id": track_id, "title": title, "status": "todo"}) with self._pending_gui_tasks_lock: self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # 4. Initialize ConductorEngine and run loop sys.stderr.write(f"[DEBUG] _start_track_logic: Initializing engine for {track_id}...\n") sys.stderr.flush() engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode) self.engines[track.id] = engine # Use current full markdown context for the track execution track_id_param = track.id flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param) flat.setdefault("files", {})["paths"] = self.context_files sys.stderr.write(f"[DEBUG] _start_track_logic: Aggregating context for {track_id}...\n") sys.stderr.flush() full_md, _, _ = aggregate.run(flat) sys.stderr.write(f"[DEBUG] _start_track_logic: Starting engine thread for {track_id}...\n") sys.stderr.flush() # Start the engine in a separate thread threading.Thread(target=engine.run, kwargs={"md_content": full_md}, daemon=True).start() sys.stderr.write(f"[DEBUG] _start_track_logic: Engine thread spawned for {track_id}.\n") sys.stderr.flush() except Exception as e: self.ai_status = f"Track start error: {e}" print(f"ERROR in _start_track_logic: {e}") def _cb_ticket_retry(self, ticket_id: str) -> None: """ [C: tests/test_mma_ticket_actions.py:test_cb_ticket_retry] """ for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'todo' break self.event_queue.put("mma_retry", {"ticket_id": ticket_id}) def _cb_ticket_skip(self, ticket_id: str) -> None: """ [C: tests/test_mma_ticket_actions.py:test_cb_ticket_skip] """ for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'skipped' break self.event_queue.put("mma_skip", {"ticket_id": ticket_id}) def _spawn_worker(self, ticket_id: str, data: dict = None) -> None: """Manually initiates a sub-agent execution for a ticket.""" engine = self.engines.get(self.active_track.id if self.active_track else None) if engine: for t in self.active_track.tickets: if t.id == ticket_id: t.status = "todo" t.step_mode = False break engine.engine.auto_queue = True self.event_queue.put("mma_retry", {"ticket_id": ticket_id}) def kill_worker(self, worker_id: str) -> None: """ Aborts a running worker. [C: src/gui_2.py:App._cb_kill_ticket, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread] """ engine = self.engines.get(self.active_track.id if self.active_track else None) if engine: engine.kill_worker(worker_id) def pause_mma(self) -> None: """Pauses the global MMA loop.""" self.mma_step_mode = True engine = self.engines.get(self.active_track.id if self.active_track else None) if engine: engine.pause() def resume_mma(self) -> None: """Resumes the global MMA loop.""" self.mma_step_mode = False engine = self.engines.get(self.active_track.id if self.active_track else None) if engine: engine.resume() def inject_context(self, data: dict) -> None: """ Programmatic context injection. [C: tests/test_headless_simulation.py:test_mma_track_lifecycle_simulation] """ file_path = data.get("file_path") if file_path: if not os.path.isabs(file_path): file_path = os.path.relpath(file_path, self.active_project_root) existing = next((f for f in self.files if (f.path if hasattr(f, "path") else str(f)) == file_path), None) if not existing: item = models.FileItem(path=file_path) self.files.append(item) self._refresh_from_project() def approve_ticket(self, ticket_id: str) -> None: """Manually approves a ticket for execution.""" engine = self.engines.get(self.active_track.id if self.active_track else None) if engine and engine.engine: engine.engine.approve_task(ticket_id) else: # Fallback if engine not running for t in self.active_tickets: if t.get('id') == ticket_id: t['status'] = 'in_progress' break self._push_mma_state_update() def mutate_dag(self, data: dict) -> None: """Modifies task dependencies.""" ticket_id = data.get("ticket_id") depends_on = data.get("depends_on") if ticket_id and depends_on is not None: for t in self.active_tickets: if t.get("id") == ticket_id: t["depends_on"] = depends_on break if self.active_track: for t in self.active_track.tickets: if t.id == ticket_id: t.depends_on = depends_on break engine = self.engines.get(self.active_track.id if self.active_track else None) if engine: from src.dag_engine import TrackDAG, ExecutionEngine engine.dag = TrackDAG(self.active_track.tickets) engine.engine = ExecutionEngine(engine.dag, auto_queue=engine.engine.auto_queue) self._push_mma_state_update() def _cb_run_conductor_setup(self) -> None: """ [C: src/gui_2.py:App._render_mma_conductor_setup, tests/test_gui_phase3.py:test_conductor_setup_scan] """ base = paths.get_conductor_dir(project_path=self.active_project_root) if not base.exists(): self.ui_conductor_setup_summary = f"Error: {base}/ directory not found." return files = list(base.glob("**/*")) files = [f for f in files if f.is_file()] summary = [f"Conductor Directory: {base.absolute()}"] summary.append(f"Total Files: {len(files)}") total_lines = 0 for f in files: try: with open(f, "r", encoding="utf-8") as fd: lines = len(fd.readlines()) total_lines += lines summary.append(f"- {f.relative_to(base)}: {lines} lines") except Exception: summary.append(f"- {f.relative_to(base)}: Error reading") summary.append(f"Total Line Count: {total_lines}") tracks_dir = base / "tracks" if tracks_dir.exists(): tracks = [d for d in tracks_dir.iterdir() if d.is_dir()] summary.append(f"Total Tracks Found: {len(tracks)}") else: summary.append("Tracks Directory: Not found") self.ui_conductor_setup_summary = "\n".join(summary) def _cb_create_track(self, name: str, desc: str, track_type: str) -> None: """ [C: src/gui_2.py:App._render_mma_track_browser, tests/test_gui_phase3.py:test_create_track] """ if not name: return date_suffix = datetime.now().strftime("%Y%m%d") track_id = f"{name.lower().replace(' ', '_')}_{date_suffix}" track_dir = paths.get_track_state_dir(track_id, project_path=self.active_project_root) track_dir.mkdir(parents=True, exist_ok=True) spec_file = track_dir / "spec.md" with open(spec_file, "w", encoding="utf-8") as f: f.write(f"# Specification: {name}\n\nType: {track_type}\n\nDescription: {desc}\n") plan_file = track_dir / "plan.md" with open(plan_file, "w", encoding="utf-8") as f: f.write(f"# Implementation Plan: {name}\n\n- [ ] Task 1: Initialize\n") meta_file = track_dir / "metadata.json" with open(meta_file, "w", encoding="utf-8") as f: json.dump({ "id": track_id, "title": name, "description": desc, "type": track_type, "status": "new", "progress": 0.0 }, f, indent=1) # Refresh tracks from disk self.tracks = project_manager.get_all_tracks(self.active_project_root) def _push_mma_state_update(self) -> None: """ [C: src/gui_2.py:App._cb_block_ticket, src/gui_2.py:App._cb_unblock_ticket, src/gui_2.py:App._render_mma_ticket_editor, src/gui_2.py:App._render_task_dag_panel, src/gui_2.py:App._render_ticket_queue, src/gui_2.py:App._reorder_ticket, src/gui_2.py:App.bulk_block, src/gui_2.py:App.bulk_execute, src/gui_2.py:App.bulk_skip, tests/test_gui_phase4.py:test_push_mma_state_update] """ if not self.active_track: return # Sync active_tickets (list of dicts) back to active_track.tickets (list of models.Ticket objects) self.active_track.tickets = [models.Ticket.from_dict(t) for t in self.active_tickets] # Save the state to disk existing = project_manager.load_track_state(self.active_track.id, self.active_project_root) meta = models.Metadata( id=self.active_track.id, name=self.active_track.description, status=self.mma_status, created_at=existing.metadata.created_at if existing else datetime.now(), updated_at=datetime.now() ) state = models.TrackState( metadata=meta, discussion=existing.discussion if existing else [], tasks=self.active_track.tickets ) project_manager.save_track_state(self.active_track.id, state, self.active_project_root) def _load_active_tickets(self) -> None: """ Populates self.active_tickets based on the current execution mode. [C: tests/test_gui_dag_beads.py:test_load_active_tickets_from_beads] """ if getattr(self, "ui_project_execution_mode", "native") == "beads": from src import beads_client bclient = beads_client.BeadsClient(Path(self.active_project_root)) beads = bclient.list_beads() self.active_tickets = [] for b in beads: self.active_tickets.append({ "id": b.id, "title": b.title, "description": b.description, "status": b.status, "assigned_to": "tier3-worker", "target_file": "", "depends_on": [] }) else: if self.active_track: self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in self.active_track.tickets] else: self.active_tickets = []