chore(tech-debt): Finalize gui_2.py cleanup and test suite discipline

This commit is contained in:
2026-03-02 21:43:56 -05:00
parent 8af1bcd960
commit a569f8c02f
26 changed files with 58 additions and 364 deletions

110
gui_2.py
View File

@@ -122,7 +122,6 @@ class ConfirmDialog:
class MMAApprovalDialog:
def __init__(self, ticket_id: str, payload: str) -> None:
self._ticket_id = ticket_id
self._payload = payload
self._condition = threading.Condition()
self._done = False
@@ -136,8 +135,6 @@ class MMAApprovalDialog:
class MMASpawnApprovalDialog:
def __init__(self, ticket_id: str, role: str, prompt: str, context_md: str) -> None:
self._ticket_id = ticket_id
self._role = role
self._prompt = prompt
self._context_md = context_md
self._condition = threading.Condition()
@@ -301,9 +298,6 @@ class App:
self._scroll_tool_calls_to_bottom = False
self._pending_gui_tasks: list[dict[str, Any]] = []
self.session_usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0}
self._token_budget_pct = 0.0
self._token_budget_current = 0
self._token_budget_limit = 0
self._gemini_cache_text = ""
self._last_stable_md: str = ''
self._token_stats: dict = {}
@@ -466,95 +460,6 @@ class App:
return header_key
raise HTTPException(status_code=403, detail="Could not validate API Key")
@api.get("/health")
def health() -> dict[str, str]:
"""Basic health check endpoint."""
return {"status": "ok"}
@api.get("/status", dependencies=[Depends(get_api_key)])
def status() -> dict[str, Any]:
"""Returns the current status of the AI provider and active project."""
return {
"provider": self.current_provider,
"model": self.current_model,
"active_project": self.active_project_path,
"ai_status": self.ai_status,
"session_usage": self.session_usage
}
@api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)])
def pending_actions() -> list[dict[str, Any]]:
"""Lists all PowerShell scripts awaiting manual confirmation."""
actions = []
with self._pending_dialog_lock:
for uid, dialog in self._pending_actions.items():
actions.append({
"action_id": uid,
"script": dialog._script,
"base_dir": dialog._base_dir
})
if self._pending_dialog:
actions.append({
"action_id": self._pending_dialog._uid,
"script": self._pending_dialog._script,
"base_dir": self._pending_dialog._base_dir
})
return actions
@api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)])
def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, Any]:
"""Approves or denies a pending PowerShell script execution."""
success = self.resolve_pending_action(action_id, req.approved)
if not success:
raise HTTPException(status_code=404, detail=f"Action ID {action_id} not found")
return {"status": "success", "action_id": action_id, "approved": req.approved}
@api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)])
def list_sessions() -> list[str]:
"""Lists all available session log files."""
log_dir = Path("logs")
if not log_dir.exists():
return []
return sorted([f.name for f in log_dir.glob("*.log")], reverse=True)
@api.get("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)])
def get_session(filename: str) -> dict[str, str]:
"""Retrieves the content of a specific session log file."""
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
log_path = Path("logs") / filename
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Session log not found")
try:
content = log_path.read_text(encoding="utf-8")
return {"filename": filename, "content": content}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@api.delete("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)])
def delete_session(filename: str) -> dict[str, str]:
"""Deletes a specific session log file."""
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
log_path = Path("logs") / filename
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Session log not found")
try:
log_path.unlink()
return {"status": "success", "message": f"Deleted {filename}"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@api.get("/api/v1/context", dependencies=[Depends(get_api_key)])
def get_context() -> dict[str, Any]:
"""Returns the current file and screenshot context configuration."""
return {
"files": self.files,
"screenshots": self.screenshots,
"files_base_dir": self.ui_files_base_dir,
"screenshots_base_dir": self.ui_shots_base_dir
}
@api.post("/api/v1/generate", dependencies=[Depends(get_api_key)])
def generate(req: GenerateRequest) -> dict[str, Any]:
"""Triggers an AI generation request using the current project context."""
@@ -618,11 +523,6 @@ class App:
"""Placeholder for streaming AI generation responses (Not yet implemented)."""
raise HTTPException(status_code=501, detail="Streaming endpoint (/api/v1/stream) is not yet supported in this version.")
@api.get("/api/gui/token_stats", dependencies=[Depends(get_api_key)])
def token_stats() -> dict[str, Any]:
"""Returns current token budget stats for simulation/test verification."""
return dict(self._token_stats)
return api
# ---------------------------------------------------------------- project loading
@@ -1390,16 +1290,6 @@ class App:
self.session_usage["last_latency"] = payload["latency"]
self._recalculate_session_usage()
def fetch_stats():
try:
stats = ai_client.get_history_bleed_stats(md_content=md_content or self.last_md)
self._token_budget_pct = stats.get("percentage", 0.0) / 100.0
self._token_budget_current = stats.get("current", 0)
self._token_budget_limit = stats.get("limit", 0)
self._token_stats = stats
except Exception:
pass
threading.Thread(target=fetch_stats, daemon=True).start()
cache_stats = payload.get("cache_stats")
if cache_stats:
count = cache_stats.get("cache_count", 0)