Files
manual_slop/src/app_controller.py
Ed_ 2d92674aa0 fix(controller): Add stop_services() and dialog imports for GUI decoupling
- Add AppController.stop_services() to clean up AI client and event loop
- Add ConfirmDialog, MMAApprovalDialog, MMASpawnApprovalDialog imports to gui_2.py
- Fix test mocks for MMA dashboard and approval indicators
- Add retry logic to conftest.py for Windows file lock cleanup
2026-03-04 20:16:16 -05:00

1543 lines
59 KiB
Python

import asyncio
import threading
import time
import sys
import os
from typing import Any, List, Dict, Optional, Tuple, Callable
from pathlib import Path
import json
import uuid
import tomli_w
import requests
from dataclasses import asdict
from datetime import datetime
from tkinter import filedialog, Tk
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel
from src import events
from src import session_logger
from src import project_manager
from src.performance_monitor import PerformanceMonitor
from src.models import Track, Ticket, load_config, parse_history_entries, DISC_ROLES, AGENT_TOOL_NAMES, CONFIG_PATH
from src.log_registry import LogRegistry
from src.log_pruner import LogPruner
from src.file_cache import ASTParser
import ai_client
import shell_runner
import mcp_client
import aggregate
import orchestrator_pm
import conductor_tech_lead
import cost_tracker
import multi_agent_conductor
from src import theme
from ai_client import ProviderError
def save_config(config: dict[str, Any]) -> None:
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(config, f)
def hide_tk_root() -> Tk:
root = Tk()
root.withdraw()
root.wm_attributes("-topmost", True)
return root
class GenerateRequest(BaseModel):
prompt: str
auto_add_history: bool = True
temperature: float | None = None
max_tokens: int | None = None
class ConfirmRequest(BaseModel):
approved: bool
script: Optional[str] = None
class ConfirmDialog:
def __init__(self, script: str, base_dir: str) -> None:
self._uid = str(uuid.uuid4())
self._script = str(script) if script is not None else ""
self._base_dir = str(base_dir) if base_dir is not None else ""
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return self._approved, self._script
class MMAApprovalDialog:
def __init__(self, ticket_id: str, payload: str) -> None:
self._payload = payload
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return self._approved, self._payload
class MMASpawnApprovalDialog:
def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None:
self._prompt = prompt
self._context_md = context_md
self._condition = threading.Condition()
self._done = False
self._approved = False
self._abort = False
def wait(self) -> dict[str, Any]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return {
'approved': self._approved,
'abort': self._abort,
'prompt': self._prompt,
'context_md': self._context_md
}
class AppController:
"""
The headless controller for the Manual Slop application.
Owns the application state and manages background services.
"""
def __init__(self):
# Initialize locks first to avoid initialization order issues
self._send_thread_lock: threading.Lock = threading.Lock()
self._disc_entries_lock: threading.Lock = threading.Lock()
self._pending_comms_lock: threading.Lock = threading.Lock()
self._pending_tool_calls_lock: threading.Lock = threading.Lock()
self._pending_history_adds_lock: threading.Lock = threading.Lock()
self._pending_gui_tasks_lock: threading.Lock = threading.Lock()
self._pending_dialog_lock: threading.Lock = threading.Lock()
self._api_event_queue_lock: threading.Lock = threading.Lock()
self.config: Dict[str, Any] = {}
self.project: Dict[str, Any] = {}
self.active_project_path: str = ""
self.project_paths: List[str] = []
self.active_discussion: str = "main"
self.disc_entries: List[Dict[str, Any]] = []
self.disc_roles: List[str] = []
self.files: List[str] = []
self.screenshots: List[str] = []
self.event_queue: events.AsyncEventQueue = events.AsyncEventQueue()
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_thread: Optional[threading.Thread] = None
self.tracks: List[Dict[str, Any]] = []
self.active_track: Optional[Track] = None
self.active_tickets: List[Dict[str, Any]] = []
self.mma_streams: Dict[str, str] = {}
self.mma_status: str = "idle"
self._tool_log: List[Dict[str, Any]] = []
self._comms_log: List[Dict[str, Any]] = []
self.session_usage: Dict[str, Any] = {
"input_tokens": 0,
"output_tokens": 0,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0,
"last_latency": 0.0
}
self.mma_tier_usage: Dict[str, Dict[str, Any]] = {
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"},
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview"},
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
}
self.perf_monitor: PerformanceMonitor = PerformanceMonitor()
self._pending_gui_tasks: List[Dict[str, Any]] = []
self._api_event_queue: List[Dict[str, Any]] = []
# Pending dialogs state moved from App
self._pending_dialog: Optional[ConfirmDialog] = None
self._pending_dialog_open: bool = False
self._pending_actions: Dict[str, ConfirmDialog] = {}
self._pending_ask_dialog: bool = False
# AI settings state
self._current_provider: str = "gemini"
self._current_model: str = "gemini-2.5-flash-lite"
self.temperature: float = 0.0
self.max_tokens: int = 8192
self.history_trunc_limit: int = 8000
# UI-related state moved to controller
self.ui_ai_input: str = ""
self.ui_disc_new_name_input: str = ""
self.ui_disc_new_role_input: str = ""
self.ui_epic_input: str = ""
self.ui_new_track_name: str = ""
self.ui_new_track_desc: str = ""
self.ui_new_track_type: str = "feature"
self.ui_conductor_setup_summary: str = ""
self.ui_last_script_text: str = ""
self.ui_last_script_output: str = ""
self.ui_new_ticket_id: str = ""
self.ui_new_ticket_desc: str = ""
self.ui_new_ticket_target: str = ""
self.ui_new_ticket_deps: str = ""
self.ui_output_dir: str = ""
self.ui_files_base_dir: str = ""
self.ui_shots_base_dir: str = ""
self.ui_project_git_dir: str = ""
self.ui_project_main_context: str = ""
self.ui_project_system_prompt: str = ""
self.ui_gemini_cli_path: str = "gemini"
self.ui_word_wrap: bool = True
self.ui_summary_only: bool = False
self.ui_auto_add_history: bool = False
self.ui_global_system_prompt: str = ""
self.ui_agent_tools: Dict[str, bool] = {}
self.available_models: List[str] = []
self.proposed_tracks: List[Dict[str, Any]] = []
self._show_track_proposal_modal: bool = False
self.ai_status: str = 'idle'
self.ai_response: str = ''
self.last_md: str = ''
self.last_md_path: Optional[Path] = None
self.last_file_items: List[Any] = []
self.send_thread: Optional[threading.Thread] = None
self.models_thread: Optional[threading.Thread] = None
self.show_windows: Dict[str, bool] = {}
self.show_script_output: bool = False
self.show_text_viewer: bool = False
self.text_viewer_title: str = ''
self.text_viewer_content: str = ''
self._pending_comms: List[Dict[str, Any]] = []
self._pending_tool_calls: List[Dict[str, Any]] = []
self._pending_history_adds: List[Dict[str, Any]] = []
self.perf_history: Dict[str, List[float]] = {'frame_time': [0.0]*100, 'fps': [0.0]*100, 'cpu': [0.0]*100, 'input_lag': [0.0]*100}
self._perf_last_update: float = 0.0
self._autosave_interval: float = 60.0
self._last_autosave: float = time.time()
# More state moved from App
self._ask_dialog_open: bool = False
self._ask_request_id: Optional[str] = None
self._ask_tool_data: Optional[Dict[str, Any]] = None
self.mma_step_mode: bool = False
self.active_tier: Optional[str] = None
self.ui_focus_agent: Optional[str] = None
self._pending_mma_approval: Optional[Dict[str, Any]] = None
self._mma_approval_open: bool = False
self._mma_approval_edit_mode: bool = False
self._mma_approval_payload: str = ""
self._pending_mma_spawn: Optional[Dict[str, Any]] = None
self._mma_spawn_open: bool = False
self._mma_spawn_edit_mode: bool = False
self._mma_spawn_prompt: str = ''
self._mma_spawn_context: str = ''
self._trigger_blink: bool = False
self._is_blinking: bool = False
self._blink_start_time: float = 0.0
self._trigger_script_blink: bool = False
self._is_script_blinking: bool = False
self._script_blink_start_time: float = 0.0
self._scroll_disc_to_bottom: bool = False
self._scroll_comms_to_bottom: bool = False
self._scroll_tool_calls_to_bottom: bool = False
self._gemini_cache_text: str = ""
self._last_stable_md: str = ''
self._token_stats: Dict[str, Any] = {}
self._token_stats_dirty: bool = False
self.ui_disc_truncate_pairs: int = 2
self.ui_auto_scroll_comms: bool = True
self.ui_auto_scroll_tool_calls: bool = True
self._show_add_ticket_form: bool = False
self._track_discussion_active: bool = False
self._tier_stream_last_len: Dict[str, int] = {}
self.is_viewing_prior_session: bool = False
self.prior_session_entries: List[Dict[str, Any]] = []
self.test_hooks_enabled: bool = ("--enable-test-hooks" in sys.argv) or (os.environ.get("SLOP_TEST_HOOKS") == "1")
self.ui_manual_approve: bool = False
def init_state(self):
"""Initializes the application state from configurations."""
self.config = load_config()
ai_cfg = self.config.get("ai", {})
self._current_provider = ai_cfg.get("provider", "gemini")
self._current_model = ai_cfg.get("model", "gemini-2.5-flash-lite")
self.temperature = ai_cfg.get("temperature", 0.0)
self.max_tokens = ai_cfg.get("max_tokens", 8192)
self.history_trunc_limit = ai_cfg.get("history_trunc_limit", 8000)
projects_cfg = self.config.get("projects", {})
self.project_paths = list(projects_cfg.get("paths", []))
self.active_project_path = projects_cfg.get("active", "")
self._load_active_project()
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
with self._disc_entries_lock:
self.disc_entries = parse_history_entries(disc_data.get("history", []), self.disc_roles)
# UI state
self.ui_output_dir = self.project.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = self.project.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = self.project.get("screenshots", {}).get("base_dir", ".")
proj_meta = self.project.get("project", {})
self.ui_project_git_dir = proj_meta.get("git_dir", "")
self.ui_project_main_context = proj_meta.get("main_context", "")
self.ui_project_system_prompt = proj_meta.get("system_prompt", "")
self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini")
self.ui_word_wrap = proj_meta.get("word_wrap", True)
self.ui_summary_only = proj_meta.get("summary_only", False)
self.ui_auto_add_history = disc_sec.get("auto_add", False)
self.ui_global_system_prompt = self.config.get("ai", {}).get("system_prompt", "")
_default_windows = {
"Context Hub": True,
"Files & Media": True,
"AI Settings": True,
"MMA Dashboard": True,
"Tier 1: Strategy": True,
"Tier 2: Tech Lead": True,
"Tier 3: Workers": True,
"Tier 4: QA": True,
"Discussion Hub": True,
"Operations Hub": True,
"Theme": True,
"Log Management": False,
"Diagnostics": False,
}
saved = self.config.get("gui", {}).get("show_windows", {})
self.show_windows = {k: saved.get(k, v) for k, v in _default_windows.items()}
agent_tools_cfg = self.project.get("agent", {}).get("tools", {})
self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
label = self.project.get("project", {}).get("name", "")
session_logger.open_session(label=label)
def cb_load_prior_log(self) -> None:
root = hide_tk_root()
path = filedialog.askopenfilename(
title="Load Session Log",
initialdir="logs",
filetypes=[("Log/JSONL", "*.log *.jsonl"), ("All Files", "*.*")]
)
root.destroy()
if not path:
return
entries = []
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
except Exception as e:
self.ai_status = f"log load error: {e}"
return
self.prior_session_entries = entries
self.is_viewing_prior_session = True
self.ai_status = f"viewing prior session: {Path(path).name} ({len(entries)} entries)"
def _load_active_project(self) -> None:
"""Loads the active project configuration, with fallbacks."""
if self.active_project_path and Path(self.active_project_path).exists():
try:
self.project = project_manager.load_project(self.active_project_path)
return
except Exception as e:
print(f"Failed to load project {self.active_project_path}: {e}")
for pp in self.project_paths:
if Path(pp).exists():
try:
self.project = project_manager.load_project(pp)
self.active_project_path = pp
return
except Exception:
continue
self.project = project_manager.migrate_from_legacy_config(self.config)
name = self.project.get("project", {}).get("name", "project")
fallback_path = f"{name}.toml"
project_manager.save_project(self.project, fallback_path)
self.active_project_path = fallback_path
if fallback_path not in self.project_paths:
self.project_paths.append(fallback_path)
def _prune_old_logs(self) -> None:
"""Asynchronously prunes old insignificant logs on startup."""
def run_prune() -> None:
try:
registry = LogRegistry("logs/log_registry.toml")
pruner = LogPruner(registry, "logs")
pruner.prune()
except Exception as e:
print(f"Error during log pruning: {e}")
thread = threading.Thread(target=run_prune, daemon=True)
thread.start()
def _fetch_models(self, provider: str) -> None:
self.ai_status = "fetching models..."
def do_fetch() -> None:
try:
models = ai_client.list_models(provider)
self.available_models = models
if self.current_model not in models and models:
self.current_model = models[0]
ai_client.set_provider(self._current_provider, self.current_model)
self.ai_status = f"models loaded: {len(models)}"
except Exception as e:
self.ai_status = f"model fetch error: {e}"
self.models_thread = threading.Thread(target=do_fetch, daemon=True)
self.models_thread.start()
def start_services(self, app: Any = None):
"""Starts background threads and async event loop."""
self._prune_old_logs()
self._init_ai_and_hooks(app)
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
def stop_services(self) -> None:
"""Stops background threads and cleans up resources."""
import ai_client
ai_client.cleanup()
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._loop_thread and self._loop_thread.is_alive():
self._loop_thread.join(timeout=2.0)
def _init_ai_and_hooks(self, app: Any = None) -> None:
import api_hooks
ai_client.set_provider(self._current_provider, self._current_model)
if self._current_provider == "gemini_cli":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
ai_client.confirm_and_run_callback = self._confirm_and_run
ai_client.comms_log_callback = self._on_comms_entry
ai_client.tool_log_callback = self._on_tool_log
mcp_client.perf_monitor_callback = self.perf_monitor.get_metrics
self.perf_monitor.alert_callback = self._on_performance_alert
ai_client.events.on("request_start", self._on_api_event)
ai_client.events.on("response_received", self._on_api_event)
ai_client.events.on("tool_execution", self._on_api_event)
self._settable_fields: Dict[str, str] = {
'ai_input': 'ui_ai_input',
'project_git_dir': 'ui_project_git_dir',
'auto_add_history': 'ui_auto_add_history',
'disc_new_name_input': 'ui_disc_new_name_input',
'project_main_context': 'ui_project_main_context',
'gcli_path': 'ui_gemini_cli_path',
'output_dir': 'ui_output_dir',
'files_base_dir': 'ui_files_base_dir',
'ai_status': 'ai_status',
'ai_response': 'ai_response',
'active_discussion': 'active_discussion',
'current_provider': 'current_provider',
'current_model': 'current_model',
'token_budget_pct': '_token_budget_pct',
'token_budget_current': '_token_budget_current',
'token_budget_label': '_token_budget_label',
'show_confirm_modal': 'show_confirm_modal',
'mma_epic_input': 'ui_epic_input',
'mma_status': 'mma_status',
'mma_active_tier': 'active_tier',
'ui_new_track_name': 'ui_new_track_name',
'ui_new_track_desc': 'ui_new_track_desc',
'manual_approve': 'ui_manual_approve'
}
self.hook_server = api_hooks.HookServer(app if app else self)
self.hook_server.start()
def _run_event_loop(self):
"""Internal loop runner."""
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._process_event_queue())
self._loop.run_forever()
async def _process_event_queue(self) -> None:
"""Listens for and processes events from the AsyncEventQueue."""
while True:
event_name, payload = await self.event_queue.get()
if event_name == "user_request":
self._loop.run_in_executor(None, self._handle_request_event, payload)
elif event_name == "response":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": payload
})
elif event_name == "mma_state_update":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "mma_state_update",
"payload": payload
})
elif event_name == "mma_stream":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "mma_stream_append",
"payload": payload
})
elif event_name in ("mma_spawn_approval", "mma_step_approval"):
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append(payload)
def _handle_request_event(self, event: events.UserRequestEvent) -> None:
"""Processes a UserRequestEvent by calling the AI client."""
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": event.prompt,
"collapsed": False,
"ts": project_manager.now_ts()
})
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
ai_client.set_model_params(self.temperature, self.max_tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools)
try:
resp = ai_client.send(
event.stable_md,
event.prompt,
event.base_dir,
event.file_items,
event.disc_text,
pre_tool_callback=self._confirm_and_run,
qa_callback=ai_client.run_tier4_analysis
)
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": resp, "status": "done"}),
self._loop
)
except ProviderError as e:
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": e.ui_message(), "status": "error", "role": "Vendor API"}),
self._loop
)
except Exception as e:
asyncio.run_coroutine_threadsafe(
self.event_queue.put("response", {"text": f"ERROR: {e}", "status": "error", "role": "System"}),
self._loop
)
def _on_comms_entry(self, entry: Dict[str, Any]) -> None:
session_logger.log_comms(entry)
entry["local_ts"] = time.time()
kind = entry.get("kind")
payload = entry.get("payload", {})
if kind in ("tool_result", "tool_call"):
role = "Tool" if kind == "tool_result" else "Vendor API"
content = ""
if kind == "tool_result":
content = payload.get("output", "")
else:
content = payload.get("script") or payload.get("args") or payload.get("message", "")
if isinstance(content, dict):
content = json.dumps(content, indent=1)
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": role,
"content": f"[{kind.upper().replace('_', ' ')}]\n{content}",
"collapsed": True,
"ts": entry.get("ts", project_manager.now_ts())
})
if kind == "history_add":
payload = entry.get("payload", {})
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": payload.get("role", "AI"),
"content": payload.get("content", ""),
"collapsed": payload.get("collapsed", False),
"ts": entry.get("ts", project_manager.now_ts())
})
return
with self._pending_comms_lock:
self._pending_comms.append(entry)
def _on_tool_log(self, script: str, result: str) -> None:
session_logger.log_tool_call(script, result, None)
source_tier = ai_client.current_tier
with self._pending_tool_calls_lock:
self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
def _on_api_event(self, *args: Any, **kwargs: Any) -> None:
payload = kwargs.get("payload", {})
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({"action": "refresh_api_metrics", "payload": payload})
def _on_performance_alert(self, message: str) -> None:
alert_text = f"[PERFORMANCE ALERT] {message}. Please consider optimizing recent changes or reducing load."
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "System",
"content": alert_text,
"ts": project_manager.now_ts()
})
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> Optional[str]:
if self.test_hooks_enabled and not getattr(self, "ui_manual_approve", False):
self.ai_status = "running powershell..."
output = shell_runner.run_powershell(script, base_dir, qa_callback=qa_callback)
self._append_tool_log(script, output)
self.ai_status = "powershell done, awaiting AI..."
return output
dialog = ConfirmDialog(script, base_dir)
is_headless = "--headless" in sys.argv
if is_headless:
with self._pending_dialog_lock:
self._pending_actions[dialog._uid] = dialog
else:
with self._pending_dialog_lock:
self._pending_dialog = dialog
if self.test_hooks_enabled and hasattr(self, '_api_event_queue'):
with self._api_event_queue_lock:
self._api_event_queue.append({
"type": "script_confirmation_required",
"action_id": dialog._uid,
"script": str(script),
"base_dir": str(base_dir),
"ts": time.time()
})
approved, final_script = dialog.wait()
if is_headless:
with self._pending_dialog_lock:
if dialog._uid in self._pending_actions:
del self._pending_actions[dialog._uid]
if not approved:
self._append_tool_log(final_script, "REJECTED by user")
return None
self.ai_status = "running powershell..."
output = shell_runner.run_powershell(final_script, base_dir, qa_callback=qa_callback)
self._append_tool_log(final_script, output)
self.ai_status = "powershell done, awaiting AI..."
return output
def _append_tool_log(self, script: str, result: str, source_tier: str | None = None) -> None:
self._tool_log.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
self.ui_last_script_text = script
self.ui_last_script_output = result
self._trigger_script_blink = True
self.show_script_output = True
if self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
def resolve_pending_action(self, action_id: str, approved: bool) -> bool:
with self._pending_dialog_lock:
if action_id in self._pending_actions:
dialog = self._pending_actions[action_id]
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
elif self._pending_dialog and self._pending_dialog._uid == action_id:
dialog = self._pending_dialog
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
return False
@property
def current_provider(self) -> str:
return self._current_provider
@current_provider.setter
def current_provider(self, value: str) -> None:
if value != self._current_provider:
self._current_provider = value
ai_client.reset_session()
ai_client.set_provider(value, self.current_model)
self._token_stats = {}
self._token_stats_dirty = True
@property
def current_model(self) -> str:
return self._current_model
@current_model.setter
def current_model(self, value: str) -> None:
if value != self._current_model:
self._current_model = value
ai_client.reset_session()
ai_client.set_provider(self.current_provider, value)
self._token_stats = {}
self._token_stats_dirty = True
def create_api(self) -> FastAPI:
"""Creates and configures the FastAPI application for headless mode."""
api = FastAPI(title="Manual Slop Headless API")
API_KEY_NAME = "X-API-KEY"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def get_api_key(header_key: str = Depends(api_key_header)) -> str:
"""Validates the API key from the request header against configuration."""
headless_cfg = self.config.get("headless", {})
config_key = headless_cfg.get("api_key", "").strip()
env_key = os.environ.get("SLOP_API_KEY", "").strip()
target_key = env_key or config_key
if not target_key:
raise HTTPException(status_code=403, detail="API Key not configured on server")
if header_key == target_key:
return header_key
raise HTTPException(status_code=403, detail="Could not validate API Key")
@api.get("/health")
def health() -> dict[str, str]:
"""Returns the health status of the API."""
return {"status": "ok"}
@api.get("/status", dependencies=[Depends(get_api_key)])
def status() -> dict[str, Any]:
"""Returns the current status of the application."""
return {
"provider": self.current_provider,
"model": self.current_model,
"status": self.ai_status,
"usage": self.session_usage
}
@api.post("/api/v1/generate", dependencies=[Depends(get_api_key)])
def generate(req: GenerateRequest) -> dict[str, Any]:
"""Triggers an AI generation request using the current project context."""
if not req.prompt.strip():
raise HTTPException(status_code=400, detail="Prompt cannot be empty")
with self._send_thread_lock:
start_time = time.time()
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self._last_stable_md = stable_md
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}")
user_msg = req.prompt
base_dir = self.ui_files_base_dir
csp = filter(bool, [self.ui_global_system_prompt.strip(), self.ui_project_system_prompt.strip()])
ai_client.set_custom_system_prompt("\n\n".join(csp))
temp = req.temperature if req.temperature is not None else self.temperature
tokens = req.max_tokens if req.max_tokens is not None else self.max_tokens
ai_client.set_model_params(temp, tokens, self.history_trunc_limit)
ai_client.set_agent_tools(self.ui_agent_tools)
if req.auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "User",
"content": user_msg,
"collapsed": False,
"ts": project_manager.now_ts()
})
try:
resp = ai_client.send(stable_md, user_msg, base_dir, self.last_file_items, disc_text)
if req.auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
"role": "AI",
"content": resp,
"collapsed": False,
"ts": project_manager.now_ts()
})
self._recalculate_session_usage()
duration = time.time() - start_time
return {
"text": resp,
"metadata": {
"provider": self.current_provider,
"model": self.current_model,
"duration_sec": round(duration, 3),
"timestamp": project_manager.now_ts()
},
"usage": self.session_usage
}
except ProviderError as e:
raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}")
@api.post("/api/v1/stream", dependencies=[Depends(get_api_key)])
async def stream(req: GenerateRequest) -> Any:
"""Placeholder for streaming AI generation responses (Not yet implemented)."""
raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.")
@api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)])
def pending_actions() -> list[dict[str, Any]]:
"""Lists all pending PowerShell scripts awaiting confirmation."""
with self._pending_dialog_lock:
return [
{"action_id": uid, "script": diag._script, "base_dir": diag._base_dir}
for uid, diag in self._pending_actions.items()
]
@api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)])
def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, str]:
"""Approves or rejects a pending action."""
with self._pending_dialog_lock:
if action_id not in self._pending_actions:
raise HTTPException(status_code=404, detail="Action not found")
dialog = self._pending_actions.pop(action_id)
if req.script is not None:
dialog._script = req.script
with dialog._condition:
dialog._approved = req.approved
dialog._done = True
dialog._condition.notify_all()
return {"status": "confirmed" if req.approved else "rejected"}
@api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)])
def list_sessions() -> list[str]:
"""Lists all session log files."""
log_dir = Path("logs")
if not log_dir.exists():
return []
return [f.name for f in log_dir.glob("*.log")]
@api.get("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)])
def get_session(session_id: str) -> dict[str, Any]:
"""Returns the content of a specific session log."""
log_path = Path("logs") / session_id
if not log_path.exists():
raise HTTPException(status_code=404, detail="Session log not found")
return {"id": session_id, "content": log_path.read_text(encoding="utf-8", errors="replace")}
@api.delete("/api/v1/sessions/{session_id}", dependencies=[Depends(get_api_key)])
def delete_session(session_id: str) -> dict[str, str]:
"""Deletes a specific session log."""
log_path = Path("logs") / session_id
if not log_path.exists():
raise HTTPException(status_code=404, detail="Session log not found")
log_path.unlink()
return {"status": "deleted"}
@api.get("/api/v1/context", dependencies=[Depends(get_api_key)])
def get_context() -> dict[str, Any]:
"""Returns the current aggregated project context."""
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
# Pull current screenshots if available in project
screenshots = self.project.get("screenshots", {}).get("paths", [])
return {
"files": [f.get("path") if isinstance(f, dict) else str(f) for f in file_items],
"screenshots": screenshots,
"files_base_dir": self.ui_files_base_dir,
"markdown": md,
"discussion": disc_text
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Context aggregation failure: {e}")
@api.get("/api/v1/token_stats", dependencies=[Depends(get_api_key)])
def token_stats() -> dict[str, Any]:
"""Returns current token usage and budget statistics."""
return self._token_stats
return api
def _cb_new_project_automated(self, user_data: Any) -> None:
if user_data:
name = Path(user_data).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, user_data)
if user_data not in self.project_paths:
self.project_paths.append(user_data)
self._switch_project(user_data)
def _cb_project_save(self) -> None:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
self.ai_status = "config saved"
def _cb_disc_create(self) -> None:
nm = self.ui_disc_new_name_input.strip()
if nm:
self._create_discussion(nm)
self.ui_disc_new_name_input = ""
def _switch_project(self, path: str) -> None:
if not Path(path).exists():
self.ai_status = f"project file not found: {path}"
return
self._flush_to_project()
self._save_active_project()
try:
self.project = project_manager.load_project(path)
self.active_project_path = path
except Exception as e:
self.ai_status = f"failed to load project: {e}"
return
self._refresh_from_project()
self.ai_status = f"switched to: {Path(path).stem}"
def _refresh_from_project(self) -> None:
self.files = list(self.project.get("files", {}).get("paths", []))
self.screenshots = list(self.project.get("screenshots", {}).get("paths", []))
disc_sec = self.project.get("discussion", {})
self.disc_roles = list(disc_sec.get("roles", list(DISC_ROLES)))
self.active_discussion = disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(self.active_discussion, {})
with self._disc_entries_lock:
self.disc_entries = parse_history_entries(disc_data.get("history", []), self.disc_roles)
proj = self.project
self.ui_output_dir = proj.get("output", {}).get("output_dir", "./md_gen")
self.ui_files_base_dir = proj.get("files", {}).get("base_dir", ".")
self.ui_shots_base_dir = proj.get("screenshots", {}).get("base_dir", ".")
self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "")
self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "")
self.ui_project_main_context = proj.get("project", {}).get("main_context", "")
self.ui_gemini_cli_path = proj.get("gemini_cli", {}).get("binary_path", "gemini")
self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False)
self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True)
self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True)
self.ui_word_wrap = proj.get("project", {}).get("word_wrap", True)
self.ui_summary_only = proj.get("project", {}).get("summary_only", False)
agent_tools_cfg = proj.get("agent", {}).get("tools", {})
self.ui_agent_tools = {t: agent_tools_cfg.get(t, True) for t in AGENT_TOOL_NAMES}
# MMA Tracks
self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir)
# Restore MMA state
mma_sec = proj.get("mma", {})
self.ui_epic_input = mma_sec.get("epic", "")
at_data = mma_sec.get("active_track")
if at_data:
try:
tickets = []
for t_data in at_data.get("tickets", []):
tickets.append(Ticket(**t_data))
self.active_track = Track(
id=at_data.get("id"),
description=at_data.get("description"),
tickets=tickets
)
self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table
except Exception as e:
print(f"Failed to deserialize active track: {e}")
self.active_track = None
else:
self.active_track = None
self.active_tickets = []
# Load track-scoped history if track is active
if self.active_track:
track_history = project_manager.load_track_history(self.active_track.id, self.ui_files_base_dir)
if track_history:
with self._disc_entries_lock:
self.disc_entries = parse_history_entries(track_history, self.disc_roles)
def _cb_load_track(self, track_id: str) -> None:
state = project_manager.load_track_state(track_id, self.ui_files_base_dir)
if state:
try:
# Convert list[Ticket] or list[dict] to list[Ticket] for Track object
tickets = []
for t in state.tasks:
if isinstance(t, dict):
tickets.append(Ticket(**t))
else:
tickets.append(t)
self.active_track = Track(
id=state.metadata.id,
description=state.metadata.name,
tickets=tickets
)
# Keep dicts for UI table (or convert Ticket objects back to dicts if needed)
self.active_tickets = [asdict(t) if not isinstance(t, dict) else t for t in tickets]
# Load track-scoped history
history = project_manager.load_track_history(track_id, self.ui_files_base_dir)
with self._disc_entries_lock:
if history:
self.disc_entries = parse_history_entries(history, self.disc_roles)
else:
self.disc_entries = []
self._recalculate_session_usage()
self.ai_status = f"Loaded track: {state.metadata.name}"
except Exception as e:
self.ai_status = f"Load track error: {e}"
print(f"Error loading track {track_id}: {e}")
def _save_active_project(self) -> None:
if self.active_project_path:
try:
project_manager.save_project(self.project, self.active_project_path)
except Exception as e:
self.ai_status = f"save error: {e}"
def _get_discussion_names(self) -> list[str]:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
return sorted(discussions.keys())
def _switch_discussion(self, name: str) -> None:
self._flush_disc_entries_to_project()
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if name not in discussions:
self.ai_status = f"discussion not found: {name}"
return
self.active_discussion = name
self._track_discussion_active = False
disc_sec["active"] = name
disc_data = discussions[name]
with self._disc_entries_lock:
self.disc_entries = parse_history_entries(disc_data.get("history", []), self.disc_roles)
self.ai_status = f"discussion: {name}"
def _flush_disc_entries_to_project(self) -> None:
history_strings = [project_manager.entry_to_str(e) for e in self.disc_entries]
if self.active_track and self._track_discussion_active:
project_manager.save_track_history(self.active_track.id, history_strings, self.ui_files_base_dir)
return
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
disc_data = discussions.setdefault(self.active_discussion, project_manager.default_discussion())
disc_data["history"] = history_strings
disc_data["last_updated"] = project_manager.now_ts()
def _create_discussion(self, name: str) -> None:
disc_sec = self.project.setdefault("discussion", {})
discussions = disc_sec.setdefault("discussions", {})
if name in discussions:
self.ai_status = f"discussion '{name}' already exists"
return
discussions[name] = project_manager.default_discussion()
self._switch_discussion(name)
def _rename_discussion(self, old_name: str, new_name: str) -> None:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if old_name not in discussions:
return
if new_name in discussions:
self.ai_status = f"discussion '{new_name}' already exists"
return
discussions[new_name] = discussions.pop(old_name)
if self.active_discussion == old_name:
self.active_discussion = new_name
disc_sec["active"] = new_name
def _delete_discussion(self, name: str) -> None:
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if len(discussions) <= 1:
self.ai_status = "cannot delete the last discussion"
return
if name not in discussions:
return
del discussions[name]
if self.active_discussion == name:
remaining = sorted(discussions.keys())
self._switch_discussion(remaining[0])
def _handle_mma_respond(self, approved: bool, payload: str | None = None, abort: bool = False, prompt: str | None = None, context_md: str | None = None) -> None:
if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
if payload is not None:
dlg._payload = payload
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_approval = None
if self._pending_mma_spawn:
spawn_dlg = self._pending_mma_spawn.get("dialog_container", [None])[0]
if spawn_dlg:
with spawn_dlg._condition:
spawn_dlg._approved = approved
spawn_dlg._abort = abort
if prompt is not None:
spawn_dlg._prompt = prompt
if context_md is not None:
spawn_dlg._context_md = context_md
spawn_dlg._done = True
spawn_dlg._condition.notify_all()
self._pending_mma_spawn = None
def _handle_approve_ask(self) -> None:
"""Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
def do_post() -> None:
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": request_id, "response": {"approved": True}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
threading.Thread(target=do_post, daemon=True).start()
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reject_ask(self) -> None:
"""Responds with rejection for a pending /api/ask request."""
if not self._ask_request_id: return
request_id = self._ask_request_id
def do_post() -> None:
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": request_id, "response": {"approved": False}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
threading.Thread(target=do_post, daemon=True).start()
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reset_session(self) -> None:
"""Logic for resetting the AI session."""
ai_client.reset_session()
ai_client.clear_comms_log()
self._tool_log.clear()
self._comms_log.clear()
self.disc_entries.clear()
# Clear history in project dict too
disc_sec = self.project.get("discussion", {})
discussions = disc_sec.get("discussions", {})
if self.active_discussion in discussions:
discussions[self.active_discussion]["history"] = []
self.ai_status = "session reset"
self.ai_response = ""
self.ui_ai_input = ""
with self._pending_history_adds_lock:
self._pending_history_adds.clear()
def _handle_md_only(self) -> None:
"""Logic for the 'MD Only' action."""
try:
md, path, *_ = self._do_generate()
self.last_md = md
self.last_md_path = path
self.ai_status = f"md written: {path.name}"
# Refresh token budget metrics with CURRENT md
self._refresh_api_metrics({}, md_content=md)
except Exception as e:
self.ai_status = f"error: {e}"
def _handle_generate_send(self) -> None:
"""Logic for the 'Gen + Send' action."""
try:
md, path, file_items, stable_md, disc_text = self._do_generate()
self._last_stable_md = stable_md
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
self.ai_status = f"generate error: {e}"
return
self.ai_status = "sending..."
user_msg = self.ui_ai_input
base_dir = self.ui_files_base_dir
# Prepare event payload
event_payload = events.UserRequestEvent(
prompt=user_msg,
stable_md=stable_md,
file_items=file_items,
disc_text=disc_text,
base_dir=base_dir
)
# Push to async queue
asyncio.run_coroutine_threadsafe(
self.event_queue.put("user_request", event_payload),
self._loop
)
def _recalculate_session_usage(self) -> None:
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0}
for entry in ai_client.get_comms_log():
if entry.get("kind") == "response" and "usage" in entry.get("payload", {}):
u = entry["payload"]["usage"]
for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]:
if k in usage:
usage[k] += u.get(k, 0) or 0
self.session_usage = usage
def _refresh_api_metrics(self, payload: dict[str, Any], md_content: str | None = None) -> None:
if "latency" in payload:
self.session_usage["last_latency"] = payload["latency"]
self._recalculate_session_usage()
if md_content is not None:
stats = ai_client.get_token_stats(md_content)
# Ensure compatibility if keys are named differently
if "total_tokens" in stats and "estimated_prompt_tokens" not in stats:
stats["estimated_prompt_tokens"] = stats["total_tokens"]
self._token_stats = stats
cache_stats = payload.get("cache_stats")
if cache_stats:
count = cache_stats.get("cache_count", 0)
size_bytes = cache_stats.get("total_size_bytes", 0)
self._gemini_cache_text = f"Gemini Caches: {count} ({size_bytes / 1024:.1f} KB)"
def _flush_to_project(self) -> None:
proj = self.project
proj.setdefault("output", {})["output_dir"] = self.ui_output_dir
proj.setdefault("files", {})["base_dir"] = self.ui_files_base_dir
proj["files"]["paths"] = self.files
proj.setdefault("screenshots", {})["base_dir"] = self.ui_shots_base_dir
proj["screenshots"]["paths"] = self.screenshots
proj.setdefault("project", {})
proj["project"]["git_dir"] = self.ui_project_git_dir
proj["project"]["system_prompt"] = self.ui_project_system_prompt
proj["project"]["main_context"] = self.ui_project_main_context
proj["project"]["word_wrap"] = self.ui_word_wrap
proj["project"]["summary_only"] = self.ui_summary_only
proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms
proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls
proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path
proj.setdefault("agent", {}).setdefault("tools", {})
for t_name in AGENT_TOOL_NAMES:
proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True)
self._flush_disc_entries_to_project()
disc_sec = proj.setdefault("discussion", {})
disc_sec["roles"] = self.disc_roles
disc_sec["active"] = self.active_discussion
disc_sec["auto_add"] = self.ui_auto_add_history
# Save MMA State
mma_sec = proj.setdefault("mma", {})
mma_sec["epic"] = self.ui_epic_input
if self.active_track:
mma_sec["active_track"] = asdict(self.active_track)
else:
mma_sec["active_track"] = None
def _flush_to_config(self) -> None:
self.config["ai"] = {
"provider": self.current_provider,
"model": self.current_model,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"history_trunc_limit": self.history_trunc_limit,
}
self.config["ai"]["system_prompt"] = self.ui_global_system_prompt
self.config["projects"] = {"paths": self.project_paths, "active": self.active_project_path}
self.config["gui"] = {"show_windows": self.show_windows}
theme.save_to_config(self.config)
def _do_generate(self) -> tuple[str, Path, list[dict[str, Any]], str, str]:
"""Returns (full_md, output_path, file_items, stable_md, discussion_text)."""
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
save_config(self.config)
track_id = self.active_track.id if self.active_track else None
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id)
full_md, path, file_items = aggregate.run(flat)
# Build stable markdown (no history) for Gemini caching
screenshot_base_dir = Path(flat.get("screenshots", {}).get("base_dir", "."))
screenshots = flat.get("screenshots", {}).get("paths", [])
summary_only = flat.get("project", {}).get("summary_only", False)
stable_md = aggregate.build_markdown_no_history(file_items, screenshot_base_dir, screenshots, summary_only=summary_only)
# Build discussion history text separately
history = flat.get("discussion", {}).get("history", [])
discussion_text = aggregate.build_discussion_text(history)
return full_md, path, file_items, stable_md, discussion_text
def _cb_plan_epic(self) -> None:
def _bg_task() -> None:
try:
self.ai_status = "Planning Epic (Tier 1)..."
history = orchestrator_pm.get_track_history_summary()
proj = project_manager.load_project(self.active_project_path)
flat = project_manager.flat_config(proj)
file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", []))
_t1_baseline = len(ai_client.get_comms_log())
tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history)
_t1_new = ai_client.get_comms_log()[_t1_baseline:]
_t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"]
_t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp)
_t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp)
def _push_t1_usage(i: int, o: int) -> None:
self.mma_tier_usage["Tier 1"]["input"] += i
self.mma_tier_usage["Tier 1"]["output"] += o
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "custom_callback",
"callback": _push_t1_usage,
"args": [_t1_in, _t1_out]
})
self._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": {
"text": json.dumps(tracks, indent=2),
"stream_id": "Tier 1",
"status": "Epic tracks generated."
}
})
self._pending_gui_tasks.append({
"action": "show_track_proposal",
"payload": tracks
})
except Exception as e:
self.ai_status = f"Epic plan error: {e}"
print(f"ERROR in _cb_plan_epic background task: {e}")
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_accept_tracks(self) -> None:
self._show_track_proposal_modal = False
def _bg_task() -> None:
# Generate skeletons once
self.ai_status = "Phase 2: Generating skeletons for all tracks..."
parser = ASTParser(language="python")
generated_skeletons = ""
try:
for i, file_path in enumerate(self.files):
try:
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
abs_path = Path(self.ui_files_base_dir) / file_path
if abs_path.exists() and abs_path.suffix == ".py":
with open(abs_path, "r", encoding="utf-8") as f:
code = f.read()
generated_skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n"
except Exception as e:
print(f"Error parsing skeleton for {file_path}: {e}")
except Exception as e:
self.ai_status = f"Error generating skeletons: {e}"
print(f"Error generating skeletons: {e}")
return # Exit if skeleton generation fails
# Now loop through tracks and call _start_track_logic with generated skeletons
total_tracks = len(self.proposed_tracks)
for i, track_data in enumerate(self.proposed_tracks):
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
self.ai_status = f"Processing track {i+1} of {total_tracks}: '{title}'..."
self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started
self.ai_status = f"All {total_tracks} tracks accepted and execution started."
threading.Thread(target=_bg_task, daemon=True).start()
def _cb_start_track(self, user_data: Any = None) -> None:
if isinstance(user_data, str):
# If track_id is provided directly
track_id = user_data
# Ensure it's loaded as active
if not self.active_track or self.active_track.id != track_id:
self._cb_load_track(track_id)
if self.active_track:
# Use the active track object directly to start execution
self.mma_status = "running"
engine = multi_agent_conductor.ConductorEngine(self.active_track, self.event_queue, auto_queue=not self.mma_step_mode)
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=self.active_track.id)
full_md, _, _ = aggregate.run(flat)
asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop)
self.ai_status = f"Track '{self.active_track.description}' started."
return
idx = 0
if isinstance(user_data, int):
idx = user_data
elif isinstance(user_data, dict):
idx = user_data.get("index", 0)
if 0 <= idx < len(self.proposed_tracks):
track_data = self.proposed_tracks[idx]
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start()
self.ai_status = f"Track '{title}' started."
def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None:
try:
goal = track_data.get("goal", "")
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
self.ai_status = f"Phase 2: Generating tickets for {title}..."
skeletons = "" # Initialize skeletons variable
if skeletons_str is None: # Only generate if not provided
# 1. Get skeletons for context
parser = ASTParser(language="python")
for i, file_path in enumerate(self.files):
try:
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
abs_path = Path(self.ui_files_base_dir) / file_path
if abs_path.exists() and abs_path.suffix == ".py":
with open(abs_path, "r", encoding="utf-8") as f:
code = f.read()
skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n"
except Exception as e:
print(f"Error parsing skeleton for {file_path}: {e}")
else:
skeletons = skeletons_str # Use provided skeletons
self.ai_status = "Phase 2: Calling Tech Lead..."
_t2_baseline = len(ai_client.get_comms_log())
raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons)
_t2_new = ai_client.get_comms_log()[_t2_baseline:]
_t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"]
_t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp)
_t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp)
self.mma_tier_usage["Tier 2"]["input"] += _t2_in
self.mma_tier_usage["Tier 2"]["output"] += _t2_out
if not raw_tickets:
self.ai_status = f"Error: No tickets generated for track: {title}"
print(f"Warning: No tickets generated for track: {title}")
return
self.ai_status = "Phase 2: Sorting tickets..."
try:
sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets)
except ValueError as e:
print(f"Dependency error in track '{title}': {e}")
sorted_tickets_data = raw_tickets
# 3. Create Track and Ticket objects
tickets = []
for t_data in sorted_tickets_data:
ticket = Ticket(
id=t_data["id"],
description=t_data.get("description") or t_data.get("goal", "No description"),
status=t_data.get("status", "todo"),
assigned_to=t_data.get("assigned_to", "unassigned"),
depends_on=t_data.get("depends_on", []),
step_mode=t_data.get("step_mode", False)
)
tickets.append(ticket)
track_id = f"track_{uuid.uuid5(uuid.NAMESPACE_DNS, f'{self.active_project_path}_{title}').hex[:12]}"
track = Track(id=track_id, description=title, tickets=tickets)
# Initialize track state in the filesystem
from src.models import TrackState, Metadata
meta = Metadata(id=track_id, name=title, status="todo", created_at=datetime.now(), updated_at=datetime.now())
state = TrackState(metadata=meta, discussion=[], tasks=tickets)
project_manager.save_track_state(track_id, state, self.ui_files_base_dir)
# 4. Initialize ConductorEngine and run loop
engine = multi_agent_conductor.ConductorEngine(track, self.event_queue, auto_queue=not self.mma_step_mode)
# Use current full markdown context for the track execution
track_id_param = track.id
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param)
full_md, _, _ = aggregate.run(flat)
# Schedule the coroutine on the internal event loop
asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop)
except Exception as e:
self.ai_status = f"Track start error: {e}"
print(f"ERROR in _start_track_logic: {e}")
def _cb_ticket_retry(self, ticket_id: str) -> None:
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'todo'
break
asyncio.run_coroutine_threadsafe(
self.event_queue.put("mma_retry", {"ticket_id": ticket_id}),
self._loop
)
def _cb_ticket_skip(self, ticket_id: str) -> None:
for t in self.active_tickets:
if t.get('id') == ticket_id:
t['status'] = 'skipped'
break
asyncio.run_coroutine_threadsafe(
self.event_queue.put("mma_skip", {"ticket_id": ticket_id}),
self._loop
)
def _cb_run_conductor_setup(self) -> None:
base = Path("conductor")
if not base.exists():
self.ui_conductor_setup_summary = "Error: conductor/ directory not found."
return
files = list(base.glob("**/*"))
files = [f for f in files if f.is_file()]
summary = [f"Conductor Directory: {base.absolute()}"]
summary.append(f"Total Files: {len(files)}")
total_lines = 0
for f in files:
try:
with open(f, "r", encoding="utf-8") as fd:
lines = len(fd.readlines())
total_lines += lines
summary.append(f"- {f.relative_to(base)}: {lines} lines")
except Exception:
summary.append(f"- {f.relative_to(base)}: Error reading")
summary.append(f"Total Line Count: {total_lines}")
tracks_dir = base / "tracks"
if tracks_dir.exists():
tracks = [d for d in tracks_dir.iterdir() if d.is_dir()]
summary.append(f"Total Tracks Found: {len(tracks)}")
else:
summary.append("Tracks Directory: Not found")
self.ui_conductor_setup_summary = "\n".join(summary)
def _cb_create_track(self, name: str, desc: str, track_type: str) -> None:
if not name: return
date_suffix = datetime.now().strftime("%Y%m%d")
track_id = f"{name.lower().replace(' ', '_')}_{date_suffix}"
track_dir = Path("conductor/tracks") / track_id
track_dir.mkdir(parents=True, exist_ok=True)
spec_file = track_dir / "spec.md"
with open(spec_file, "w", encoding="utf-8") as f:
f.write(f"# Specification: {name}\n\nType: {track_type}\n\nDescription: {desc}\n")
plan_file = track_dir / "plan.md"
with open(plan_file, "w", encoding="utf-8") as f:
f.write(f"# Implementation Plan: {name}\n\n- [ ] Task 1: Initialize\n")
meta_file = track_dir / "metadata.json"
with open(meta_file, "w", encoding="utf-8") as f:
json.dump({
"id": track_id,
"title": name,
"description": desc,
"type": track_type,
"status": "new",
"progress": 0.0
}, f, indent=1)
# Refresh tracks from disk
self.tracks = project_manager.get_all_tracks(self.ui_files_base_dir)
def _push_mma_state_update(self) -> None:
if not self.active_track:
return
# Sync active_tickets (list of dicts) back to active_track.tickets (list of Ticket objects)
self.active_track.tickets = [Ticket.from_dict(t) for t in self.active_tickets]
# Save the state to disk
from src.project_manager import save_track_state, load_track_state
from src.models import TrackState, Metadata
existing = load_track_state(self.active_track.id, self.ui_files_base_dir)
meta = Metadata(
id=self.active_track.id,
name=self.active_track.description,
status=self.mma_status,
created_at=existing.metadata.created_at if existing else datetime.now(),
updated_at=datetime.now()
)
state = TrackState(
metadata=meta,
discussion=existing.discussion if existing else [],
tasks=self.active_track.tickets
)
save_track_state(self.active_track.id, state, self.ui_files_base_dir)