Compare commits

...

10 Commits

40 changed files with 1012 additions and 969 deletions

View File

@@ -1,4 +1,4 @@
# Product Guidelines: Manual Slop # Product Guidelines: Manual Slop
## Documentation Style ## Documentation Style
@@ -16,3 +16,7 @@
- **Strict State Management:** There must be a rigorous separation between the Main GUI rendering thread and daemon execution threads. The UI should *never* hang during AI communication or script execution. Use lock-protected queues and events for synchronization. - **Strict State Management:** There must be a rigorous separation between the Main GUI rendering thread and daemon execution threads. The UI should *never* hang during AI communication or script execution. Use lock-protected queues and events for synchronization.
- **Comprehensive Logging:** Aggressively log all actions, API payloads, tool calls, and executed scripts. Maintain timestamped JSON-L and markdown logs to ensure total transparency and debuggability. - **Comprehensive Logging:** Aggressively log all actions, API payloads, tool calls, and executed scripts. Maintain timestamped JSON-L and markdown logs to ensure total transparency and debuggability.
- **Dependency Minimalism:** Limit external dependencies where possible. For instance, prefer standard library modules (like `urllib` and `html.parser` for web tools) over heavy third-party packages. - **Dependency Minimalism:** Limit external dependencies where possible. For instance, prefer standard library modules (like `urllib` and `html.parser` for web tools) over heavy third-party packages.
## AI-Optimized Compact Style
- **Indentation:** Exactly **1 space** per level. This minimizes token usage in nested structures.
- **Newlines:** Maximum **one (1)** blank line between top-level definitions. **Zero (0)** blank lines within function or method bodies.
- **Type Hinting:** Mandatory, strict type hints for all parameters, return types, and global variables to ensure high-signal context for AI agents.

View File

@@ -1,4 +1,4 @@
# Technology Stack: Manual Slop # Technology Stack: Manual Slop
## Core Language ## Core Language
@@ -27,6 +27,8 @@
## Configuration & Tooling ## Configuration & Tooling
- **ai_style_formatter.py:** Custom Python formatter specifically designed to enforce 1-space indentation and ultra-compact whitespace to minimize token consumption.
- **ast (Standard Library):** For deterministic AST parsing and automated generation of curated "Skeleton Views" (signatures and docstrings) to minimize context bloat for sub-agents. - **ast (Standard Library):** For deterministic AST parsing and automated generation of curated "Skeleton Views" (signatures and docstrings) to minimize context bloat for sub-agents.
- **pydantic / dataclasses:** For defining strict state schemas (Tracks, Tickets) used in linear orchestration. - **pydantic / dataclasses:** For defining strict state schemas (Tracks, Tickets) used in linear orchestration.
- **tomli-w:** For writing TOML configuration files. - **tomli-w:** For writing TOML configuration files.
@@ -45,3 +47,4 @@
- **Asynchronous Event Bus:** Employs an `AsyncEventQueue` based on `asyncio.Queue` to manage the communication between the UI and the backend multi-agent orchestrator without blocking. - **Asynchronous Event Bus:** Employs an `AsyncEventQueue` based on `asyncio.Queue` to manage the communication between the UI and the backend multi-agent orchestrator without blocking.
- **Synchronous IPC Approval Flow:** A specialized bridge mechanism that allows headless AI providers (like Gemini CLI) to synchronously request and receive human approval for tool calls via the GUI's REST API hooks. - **Synchronous IPC Approval Flow:** A specialized bridge mechanism that allows headless AI providers (like Gemini CLI) to synchronously request and receive human approval for tool calls via the GUI's REST API hooks.
- **Interface-Driven Development (IDD):** Enforces a "Stub-and-Resolve" pattern where cross-module dependencies are resolved by generating signatures/contracts before implementation. - **Interface-Driven Development (IDD):** Enforces a "Stub-and-Resolve" pattern where cross-module dependencies are resolved by generating signatures/contracts before implementation.

View File

@@ -1,4 +1,4 @@
# Project Tracks # Project Tracks
This file tracks all major tracks for the project. Each track has its own detailed plan in its respective folder. This file tracks all major tracks for the project. Each track has its own detailed plan in its respective folder.
@@ -11,3 +11,4 @@ This file tracks all major tracks for the project. Each track has its own detail
- [~] **Track: AI-Optimized Python Style Refactor** - [~] **Track: AI-Optimized Python Style Refactor**
*Link: [./tracks/python_style_refactor_20260227/](./tracks/python_style_refactor_20260227/)* *Link: [./tracks/python_style_refactor_20260227/](./tracks/python_style_refactor_20260227/)*

View File

@@ -21,15 +21,30 @@
- [x] Task: Conductor - Update `conductor/code_styleguides/python.md` with the new AI-optimized standard. [602cea6] - [x] Task: Conductor - Update `conductor/code_styleguides/python.md` with the new AI-optimized standard. [602cea6]
- [x] Task: Conductor - User Manual Verification 'Phase 3: Metadata and Final Documentation' (Protocol in workflow.md) - [x] Task: Conductor - User Manual Verification 'Phase 3: Metadata and Final Documentation' (Protocol in workflow.md)
## Phase 4: Codebase-Wide Type Hint Sweep ## Phase 4: Codebase-Wide Type Hint Sweep [checkpoint: 2907eb9]
- [x] Task: Conductor - Type hint pass on core modules (`api_hook_client.py`, `api_hooks.py`, `log_registry.py`, `performance_monitor.py`, `theme.py`, `theme_2.py`, `gemini_cli_adapter.py`, `multi_agent_conductor.py`, `dag_engine.py`, `events.py`, `file_cache.py`, `models.py`, `log_pruner.py`, `gemini.py`, `orchestrator_pm.py`, `conductor_tech_lead.py`, `outline_tool.py`, `summarize.py`). 46c2f9a - [x] Task: Conductor - Type hint pass on core modules (`api_hook_client.py`, `api_hooks.py`, `log_registry.py`, `performance_monitor.py`, `theme.py`, `theme_2.py`, `gemini_cli_adapter.py`, `multi_agent_conductor.py`, `dag_engine.py`, `events.py`, `file_cache.py`, `models.py`, `log_pruner.py`, `gemini.py`, `orchestrator_pm.py`, `conductor_tech_lead.py`, `outline_tool.py`, `summarize.py`). 46c2f9a
- [~] Task: Conductor - Type hint pass on remaining variable-only files (`ai_client.py` vars, `mcp_client.py` vars, `mma_prompts.py` vars) - [x] Task: Conductor - Type hint pass on remaining variable-only files (`ai_client.py` vars, `mcp_client.py` vars, `mma_prompts.py` vars) 8c5a560
- [x] Task: Conductor - Type hint pass on scripts (`scripts/*.py`) 53c2bbf - [x] Task: Conductor - Type hint pass on scripts (`scripts/*.py`) 53c2bbf
- [x] Task: Conductor - Type hint pass on simulation modules (`simulation/*.py`) ec91c90 - [x] Task: Conductor - Type hint pass on simulation modules (`simulation/*.py`) ec91c90
- [~] Task: Conductor - Type hint pass on test files (`tests/*.py`, `conductor/tests/*.py`) - [x] Task: Conductor - Type hint pass on test files (`tests/*.py`, `conductor/tests/*.py`) 7a0e8e6
- [ ] Task: Conductor - User Manual Verification 'Phase 4: Codebase-Wide Type Hint Sweep' (Protocol in workflow.md) - [x] Task: Conductor - User Manual Verification 'Phase 4: Codebase-Wide Type Hint Sweep'
--- ---
**Protocol Note:** Each task will follow the Standard Task Workflow (Red/Green phases with Tier 3 Worker delegation). Phase completion will trigger the mandatory Verification and Checkpointing protocol. **Protocol Note:** Each task will follow the Standard Task Workflow (Red/Green phases with Tier 3 Worker delegation). Phase completion will trigger the mandatory Verification and Checkpointing protocol.
## Phase 5: Ultra-Compact Indentation Refactor (1-Space Indent) [checkpoint: 173ea96]
- [x] Task: Conductor - Implement automated conversion script for 4-space to 1-space indentation. 53c2bbf
- [x] Task: Conductor - Apply 1-space indent refactor to all `.py` files in the codebase. 173ea96
- [x] Task: Conductor - User Manual Verification 'Phase 5: Ultra-Compact Indentation Refactor'
## Phase 6: Test Suite Stabilization (High-Signal Reporting)
- [ ] Task: Conductor - Resolve API/Method name drift (e.g., `run_linear` -> `run` in ConductorEngine). Implement "Before vs After" state tables in `logs/test/`.
- [ ] Task: Conductor - Fix i_style_formatter.py test expectations for ultra-compact style.
- [ ] Task: Conductor - Align ier4_interceptor.py tests with current PowerShell output formatting.
- [ ] Task: Conductor - Investigate and stabilize live_gui test environment collection/startup.
- [ ] Task: Conductor - User Manual Verification 'Phase 6: Test Suite Stabilization'

View File

@@ -9,7 +9,7 @@ system_prompt = ""
[theme] [theme]
palette = "ImGui Dark" palette = "ImGui Dark"
font_size = 16.0 font_size = 16.0
scale = 1.2999999523162842 scale = 1.0
font_path = "" font_path = ""
[projects] [projects]

View File

@@ -102,6 +102,7 @@ class ConfirmDialog:
self._condition = threading.Condition() self._condition = threading.Condition()
self._done = False self._done = False
self._approved = False self._approved = False
def wait(self) -> tuple[bool, str]: def wait(self) -> tuple[bool, str]:
with self._condition: with self._condition:
while not self._done: while not self._done:
@@ -115,6 +116,7 @@ class MMAApprovalDialog:
self._condition = threading.Condition() self._condition = threading.Condition()
self._done = False self._done = False
self._approved = False self._approved = False
def wait(self) -> tuple[bool, str]: def wait(self) -> tuple[bool, str]:
with self._condition: with self._condition:
while not self._done: while not self._done:
@@ -131,6 +133,7 @@ class MMASpawnApprovalDialog:
self._done = False self._done = False
self._approved = False self._approved = False
self._abort = False self._abort = False
def wait(self) -> dict[str, Any]: def wait(self) -> dict[str, Any]:
with self._condition: with self._condition:
while not self._done: while not self._done:
@@ -293,6 +296,7 @@ class App:
def _prune_old_logs(self) -> None: def _prune_old_logs(self) -> None:
"""Asynchronously prunes old insignificant logs on startup.""" """Asynchronously prunes old insignificant logs on startup."""
def run_prune() -> None: def run_prune() -> None:
try: try:
registry = LogRegistry("logs/log_registry.toml") registry = LogRegistry("logs/log_registry.toml")
@@ -306,6 +310,7 @@ class App:
@property @property
def current_provider(self) -> str: def current_provider(self) -> str:
return self._current_provider return self._current_provider
@current_provider.setter @current_provider.setter
def current_provider(self, value: str) -> None: def current_provider(self, value: str) -> None:
if value != self._current_provider: if value != self._current_provider:
@@ -325,6 +330,7 @@ class App:
@property @property
def current_model(self) -> str: def current_model(self) -> str:
return self._current_model return self._current_model
@current_model.setter @current_model.setter
def current_model(self, value: str) -> None: def current_model(self, value: str) -> None:
if value != self._current_model: if value != self._current_model:
@@ -390,15 +396,18 @@ class App:
def create_api(self) -> FastAPI: def create_api(self) -> FastAPI:
"""Creates and configures the FastAPI application for headless mode.""" """Creates and configures the FastAPI application for headless mode."""
api = FastAPI(title="Manual Slop Headless API") api = FastAPI(title="Manual Slop Headless API")
class GenerateRequest(BaseModel): class GenerateRequest(BaseModel):
prompt: str prompt: str
auto_add_history: bool = True auto_add_history: bool = True
temperature: float | None = None temperature: float | None = None
max_tokens: int | None = None max_tokens: int | None = None
class ConfirmRequest(BaseModel): class ConfirmRequest(BaseModel):
approved: bool approved: bool
API_KEY_NAME = "X-API-KEY" API_KEY_NAME = "X-API-KEY"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def get_api_key(header_key: str = Depends(api_key_header)) -> str: async def get_api_key(header_key: str = Depends(api_key_header)) -> str:
"""Validates the API key from the request header against configuration.""" """Validates the API key from the request header against configuration."""
headless_cfg = self.config.get("headless", {}) headless_cfg = self.config.get("headless", {})
@@ -410,10 +419,12 @@ class App:
if header_key == target_key: if header_key == target_key:
return header_key return header_key
raise HTTPException(status_code=403, detail="Could not validate API Key") raise HTTPException(status_code=403, detail="Could not validate API Key")
@api.get("/health") @api.get("/health")
def health() -> dict[str, str]: def health() -> dict[str, str]:
"""Basic health check endpoint.""" """Basic health check endpoint."""
return {"status": "ok"} return {"status": "ok"}
@api.get("/status", dependencies=[Depends(get_api_key)]) @api.get("/status", dependencies=[Depends(get_api_key)])
def status() -> dict[str, Any]: def status() -> dict[str, Any]:
"""Returns the current status of the AI provider and active project.""" """Returns the current status of the AI provider and active project."""
@@ -424,6 +435,7 @@ class App:
"ai_status": self.ai_status, "ai_status": self.ai_status,
"session_usage": self.session_usage "session_usage": self.session_usage
} }
@api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)]) @api.get("/api/v1/pending_actions", dependencies=[Depends(get_api_key)])
def pending_actions() -> list[dict[str, Any]]: def pending_actions() -> list[dict[str, Any]]:
"""Lists all PowerShell scripts awaiting manual confirmation.""" """Lists all PowerShell scripts awaiting manual confirmation."""
@@ -442,6 +454,7 @@ class App:
"base_dir": self._pending_dialog._base_dir "base_dir": self._pending_dialog._base_dir
}) })
return actions return actions
@api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)]) @api.post("/api/v1/confirm/{action_id}", dependencies=[Depends(get_api_key)])
def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, Any]: def confirm_action(action_id: str, req: ConfirmRequest) -> dict[str, Any]:
"""Approves or denies a pending PowerShell script execution.""" """Approves or denies a pending PowerShell script execution."""
@@ -449,6 +462,7 @@ class App:
if not success: if not success:
raise HTTPException(status_code=404, detail=f"Action ID {action_id} not found") raise HTTPException(status_code=404, detail=f"Action ID {action_id} not found")
return {"status": "success", "action_id": action_id, "approved": req.approved} return {"status": "success", "action_id": action_id, "approved": req.approved}
@api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)]) @api.get("/api/v1/sessions", dependencies=[Depends(get_api_key)])
def list_sessions() -> list[str]: def list_sessions() -> list[str]:
"""Lists all available session log files.""" """Lists all available session log files."""
@@ -456,6 +470,7 @@ class App:
if not log_dir.exists(): if not log_dir.exists():
return [] return []
return sorted([f.name for f in log_dir.glob("*.log")], reverse=True) return sorted([f.name for f in log_dir.glob("*.log")], reverse=True)
@api.get("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)]) @api.get("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)])
def get_session(filename: str) -> dict[str, str]: def get_session(filename: str) -> dict[str, str]:
"""Retrieves the content of a specific session log file.""" """Retrieves the content of a specific session log file."""
@@ -469,6 +484,7 @@ class App:
return {"filename": filename, "content": content} return {"filename": filename, "content": content}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@api.delete("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)]) @api.delete("/api/v1/sessions/{filename}", dependencies=[Depends(get_api_key)])
def delete_session(filename: str) -> dict[str, str]: def delete_session(filename: str) -> dict[str, str]:
"""Deletes a specific session log file.""" """Deletes a specific session log file."""
@@ -482,6 +498,7 @@ class App:
return {"status": "success", "message": f"Deleted {filename}"} return {"status": "success", "message": f"Deleted {filename}"}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@api.get("/api/v1/context", dependencies=[Depends(get_api_key)]) @api.get("/api/v1/context", dependencies=[Depends(get_api_key)])
def get_context() -> dict[str, Any]: def get_context() -> dict[str, Any]:
"""Returns the current file and screenshot context configuration.""" """Returns the current file and screenshot context configuration."""
@@ -491,6 +508,7 @@ class App:
"files_base_dir": self.ui_files_base_dir, "files_base_dir": self.ui_files_base_dir,
"screenshots_base_dir": self.ui_shots_base_dir "screenshots_base_dir": self.ui_shots_base_dir
} }
@api.post("/api/v1/generate", dependencies=[Depends(get_api_key)]) @api.post("/api/v1/generate", dependencies=[Depends(get_api_key)])
def generate(req: GenerateRequest) -> dict[str, Any]: def generate(req: GenerateRequest) -> dict[str, Any]:
"""Triggers an AI generation request using the current project context.""" """Triggers an AI generation request using the current project context."""
@@ -547,6 +565,7 @@ class App:
raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}") raise HTTPException(status_code=502, detail=f"AI Provider Error: {e.ui_message()}")
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}") raise HTTPException(status_code=500, detail=f"In-flight AI request failure: {e}")
@api.post("/api/v1/stream", dependencies=[Depends(get_api_key)]) @api.post("/api/v1/stream", dependencies=[Depends(get_api_key)])
async def stream(req: GenerateRequest) -> Any: async def stream(req: GenerateRequest) -> Any:
"""Placeholder for streaming AI generation responses (Not yet implemented).""" """Placeholder for streaming AI generation responses (Not yet implemented)."""

View File

@@ -1,4 +1,4 @@
# gui.py # gui.py
""" """
Note(Gemini): Note(Gemini):
The main DearPyGui interface orchestrator. The main DearPyGui interface orchestrator.
@@ -17,8 +17,8 @@ import math
import sys import sys
import os import os
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Optional
from tkinter import filedialog, Tk from tkinter import filedialog, Tk
from typing import Optional, Callable
import aggregate import aggregate
import ai_client import ai_client
from ai_client import ProviderError from ai_client import ProviderError

View File

@@ -529,6 +529,7 @@ def py_find_usages(path: str, name: str) -> str:
import re import re
pattern = re.compile(r"\b" + re.escape(name) + r"\b") pattern = re.compile(r"\b" + re.escape(name) + r"\b")
results = [] results = []
def _search_file(fp): def _search_file(fp):
if fp.name == "history.toml" or fp.name.endswith("_history.toml"): return if fp.name == "history.toml" or fp.name.endswith("_history.toml"): return
if not _is_allowed(fp): return if not _is_allowed(fp): return
@@ -541,7 +542,6 @@ def py_find_usages(path: str, name: str) -> str:
results.append(f"{rel}:{i}: {line.strip()[:100]}") results.append(f"{rel}:{i}: {line.strip()[:100]}")
except Exception: except Exception:
pass pass
if p.is_file(): if p.is_file():
_search_file(p) _search_file(p)
else: else:
@@ -550,7 +550,6 @@ def py_find_usages(path: str, name: str) -> str:
for file in files: for file in files:
if file.endswith(('.py', '.md', '.toml', '.txt', '.json')): if file.endswith(('.py', '.md', '.toml', '.txt', '.json')):
_search_file(Path(root) / file) _search_file(Path(root) / file)
if not results: if not results:
return f"No usages found for '{name}' in {p}" return f"No usages found for '{name}' in {p}"
if len(results) > 100: if len(results) > 100:
@@ -618,7 +617,6 @@ def py_get_hierarchy(path: str, class_name: str) -> str:
subclasses.append(f"{fp.name}: class {node.name}({base.value.id}.{class_name})") subclasses.append(f"{fp.name}: class {node.name}({base.value.id}.{class_name})")
except Exception: except Exception:
pass pass
try: try:
if p.is_file(): if p.is_file():
_search_file(p) _search_file(p)
@@ -628,7 +626,6 @@ def py_get_hierarchy(path: str, class_name: str) -> str:
for file in files: for file in files:
if file.endswith('.py'): if file.endswith('.py'):
_search_file(Path(root) / file) _search_file(Path(root) / file)
if not subclasses: if not subclasses:
return f"No subclasses of '{class_name}' found in {p}" return f"No subclasses of '{class_name}' found in {p}"
return f"Subclasses of '{class_name}':\n" + "\n".join(f" - {s}" for s in subclasses) return f"Subclasses of '{class_name}':\n" + "\n".join(f" - {s}" for s in subclasses)
@@ -647,7 +644,6 @@ def py_get_docstring(path: str, name: str) -> str:
if not name or name == "module": if not name or name == "module":
doc = ast.get_docstring(tree) doc = ast.get_docstring(tree)
return doc if doc else "No module docstring found." return doc if doc else "No module docstring found."
node = _get_symbol_node(tree, name) node = _get_symbol_node(tree, name)
if not node: return f"ERROR: could not find symbol '{name}' in {path}" if not node: return f"ERROR: could not find symbol '{name}' in {path}"
doc = ast.get_docstring(node) doc = ast.get_docstring(node)
@@ -660,9 +656,9 @@ def get_tree(path: str, max_depth: int = 2) -> str:
p, err = _resolve_and_check(path) p, err = _resolve_and_check(path)
if err: return err if err: return err
if not p.is_dir(): return f"ERROR: not a directory: {path}" if not p.is_dir(): return f"ERROR: not a directory: {path}"
try: try:
max_depth = int(max_depth) max_depth = int(max_depth)
def _build_tree(dir_path, current_depth, prefix=""): def _build_tree(dir_path, current_depth, prefix=""):
if current_depth > max_depth: return [] if current_depth > max_depth: return []
lines = [] lines = []
@@ -670,10 +666,8 @@ def get_tree(path: str, max_depth: int = 2) -> str:
entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
except PermissionError: except PermissionError:
return [] return []
# Filter # Filter
entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")] entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")]
for i, entry in enumerate(entries): for i, entry in enumerate(entries):
is_last = (i == len(entries) - 1) is_last = (i == len(entries) - 1)
connector = "└── " if is_last else "├── " connector = "└── " if is_last else "├── "
@@ -682,13 +676,11 @@ def get_tree(path: str, max_depth: int = 2) -> str:
extension = " " if is_last else "" extension = " " if is_last else ""
lines.extend(_build_tree(entry, current_depth + 1, prefix + extension)) lines.extend(_build_tree(entry, current_depth + 1, prefix + extension))
return lines return lines
tree_lines = [f"{p.name}/"] + _build_tree(p, 1) tree_lines = [f"{p.name}/"] + _build_tree(p, 1)
return "\n".join(tree_lines) return "\n".join(tree_lines)
except Exception as e: except Exception as e:
return f"ERROR generating tree for '{path}': {e}" return f"ERROR generating tree for '{path}': {e}"
# ------------------------------------------------------------------ web tools
# ------------------------------------------------------------------ web tools
class _DDGParser(HTMLParser): class _DDGParser(HTMLParser):
def __init__(self) -> None: def __init__(self) -> None:
@@ -809,7 +801,7 @@ def get_ui_performance() -> str:
return f"ERROR: Failed to retrieve UI performance: {str(e)}" return f"ERROR: Failed to retrieve UI performance: {str(e)}"
# ------------------------------------------------------------------ tool dispatch # ------------------------------------------------------------------ tool dispatch
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "py_get_skeleton", "py_get_code_outline", "py_get_definition", "get_git_diff", "web_search", "fetch_url", "get_ui_performance", "get_file_slice", "set_file_slice", "py_update_definition", "py_get_signature", "py_set_signature", "py_get_class_summary", "py_get_var_declaration", "py_set_var_declaration", "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"} TOOL_NAMES: set[str] = {"read_file", "list_directory", "search_files", "get_file_summary", "py_get_skeleton", "py_get_code_outline", "py_get_definition", "get_git_diff", "web_search", "fetch_url", "get_ui_performance", "get_file_slice", "set_file_slice", "py_update_definition", "py_get_signature", "py_set_signature", "py_get_class_summary", "py_get_var_declaration", "py_set_var_declaration", "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"}
def dispatch(tool_name: str, tool_input: dict[str, Any]) -> str: def dispatch(tool_name: str, tool_input: dict[str, Any]) -> str:
""" """

View File

@@ -7,7 +7,7 @@ from typing import Dict
# --- Tier 1 (Strategic/Orchestration: PM) --- # --- Tier 1 (Strategic/Orchestration: PM) ---
TIER1_BASE_SYSTEM = """ TIER1_BASE_SYSTEM: str = """
You are the Tier 1 Orchestrator (Product Manager) for the Manual Slop project. You are the Tier 1 Orchestrator (Product Manager) for the Manual Slop project.
Your role is high-level strategic planning, architecture enforcement, and cross-module delegation. Your role is high-level strategic planning, architecture enforcement, and cross-module delegation.
You operate strictly on metadata, summaries, and executive-level directives. You operate strictly on metadata, summaries, and executive-level directives.
@@ -15,7 +15,7 @@ NEVER request or attempt to read raw implementation code unless specifically pro
Maintain a "Godot ECS Flat List format" (JSON array of objects) for structural outputs. Maintain a "Godot ECS Flat List format" (JSON array of objects) for structural outputs.
""" """
TIER1_EPIC_INIT = TIER1_BASE_SYSTEM + """ TIER1_EPIC_INIT: str = TIER1_BASE_SYSTEM + """
PATH: Epic Initialization (Project Planning) PATH: Epic Initialization (Project Planning)
GOAL: Break down a massive feature request into discrete Implementation Tracks. GOAL: Break down a massive feature request into discrete Implementation Tracks.
@@ -39,7 +39,7 @@ Return a JSON array of 'Tracks'. Each track object must follow the Godot ECS Fla
] ]
""" """
TIER1_TRACK_DELEGATION = TIER1_BASE_SYSTEM + """ TIER1_TRACK_DELEGATION: str = TIER1_BASE_SYSTEM + """
PATH: Track Delegation (Sprint Kickoff) PATH: Track Delegation (Sprint Kickoff)
GOAL: Compile a 'Track Brief' for a Tier 2 Tech Lead. GOAL: Compile a 'Track Brief' for a Tier 2 Tech Lead.
@@ -54,7 +54,7 @@ Generate a comprehensive 'Track Brief' (JSON or Markdown) which includes:
3. Explicit architectural constraints derived from the Skeleton View. 3. Explicit architectural constraints derived from the Skeleton View.
""" """
TIER1_MACRO_MERGE = TIER1_BASE_SYSTEM + """ TIER1_MACRO_MERGE: str = TIER1_BASE_SYSTEM + """
PATH: Macro-Merge & Acceptance Review PATH: Macro-Merge & Acceptance Review
GOAL: Review high-severity changes and merge into the project history. GOAL: Review high-severity changes and merge into the project history.
@@ -69,14 +69,14 @@ If Rejected, provide specific architectural feedback focusing on integration bre
# --- Tier 2 (Architectural/Tech Lead: Conductor) --- # --- Tier 2 (Architectural/Tech Lead: Conductor) ---
TIER2_BASE_SYSTEM = """ TIER2_BASE_SYSTEM: str = """
You are the Tier 2 Track Conductor (Tech Lead) for the Manual Slop project. You are the Tier 2 Track Conductor (Tech Lead) for the Manual Slop project.
Your role is module-specific planning, code review, and worker management. Your role is module-specific planning, code review, and worker management.
You bridge high-level architecture with code syntax using AST-aware Skeleton Views. You bridge high-level architecture with code syntax using AST-aware Skeleton Views.
Enforce Interface-Driven Development (IDD) and manage Topological Dependency Graphs. Enforce Interface-Driven Development (IDD) and manage Topological Dependency Graphs.
""" """
TIER2_SPRINT_PLANNING = TIER2_BASE_SYSTEM + """ TIER2_SPRINT_PLANNING: str = TIER2_BASE_SYSTEM + """
PATH: Sprint Planning (Task Delegation) PATH: Sprint Planning (Task Delegation)
GOAL: Break down a Track Brief into discrete Tier 3 Tickets. GOAL: Break down a Track Brief into discrete Tier 3 Tickets.
@@ -101,7 +101,7 @@ Include 'depends_on' pointers to construct an execution DAG (Directed Acyclic Gr
] ]
""" """
TIER2_CODE_REVIEW = TIER2_BASE_SYSTEM + """ TIER2_CODE_REVIEW: str = TIER2_BASE_SYSTEM + """
PATH: Code Review (Local Integration) PATH: Code Review (Local Integration)
GOAL: Review Tier 3 diffs and ensure they meet the Ticket's goals. GOAL: Review Tier 3 diffs and ensure they meet the Ticket's goals.
@@ -113,7 +113,7 @@ OUTPUT REQUIREMENT:
Return "Approve" (merges diff) OR "Reject" (sends technical critique back to Tier 3). Return "Approve" (merges diff) OR "Reject" (sends technical critique back to Tier 3).
""" """
TIER2_TRACK_FINALIZATION = TIER2_BASE_SYSTEM + """ TIER2_TRACK_FINALIZATION: str = TIER2_BASE_SYSTEM + """
PATH: Track Finalization (Upward Reporting) PATH: Track Finalization (Upward Reporting)
GOAL: Summarize the completed Track for the Tier 1 PM. GOAL: Summarize the completed Track for the Tier 1 PM.
@@ -125,7 +125,7 @@ OUTPUT REQUIREMENT:
Provide an Executive Summary (~200 words) and the final Macro-Diff. Provide an Executive Summary (~200 words) and the final Macro-Diff.
""" """
TIER2_CONTRACT_FIRST = TIER2_BASE_SYSTEM + """ TIER2_CONTRACT_FIRST: str = TIER2_BASE_SYSTEM + """
PATH: Contract-First Delegation (Stub-and-Resolve) PATH: Contract-First Delegation (Stub-and-Resolve)
GOAL: Resolve cross-module dependencies via Interface-Driven Development (IDD). GOAL: Resolve cross-module dependencies via Interface-Driven Development (IDD).

View File

@@ -8,5 +8,5 @@ active = "main"
[discussions.main] [discussions.main]
git_commit = "" git_commit = ""
last_updated = "2026-02-27T22:56:03" last_updated = "2026-02-28T19:35:01"
history = [] history = []

View File

@@ -16,14 +16,17 @@ from pathlib import Path
if TYPE_CHECKING: if TYPE_CHECKING:
from models import TrackState from models import TrackState
TS_FMT: str = "%Y-%m-%dT%H:%M:%S" TS_FMT: str = "%Y-%m-%dT%H:%M:%S"
def now_ts() -> str: def now_ts() -> str:
return datetime.datetime.now().strftime(TS_FMT) return datetime.datetime.now().strftime(TS_FMT)
def parse_ts(s: str) -> Optional[datetime.datetime]: def parse_ts(s: str) -> Optional[datetime.datetime]:
try: try:
return datetime.datetime.strptime(s, TS_FMT) return datetime.datetime.strptime(s, TS_FMT)
except Exception: except Exception:
return None return None
# ── entry serialisation ────────────────────────────────────────────────────── # ── entry serialisation ──────────────────────────────────────────────────────
def entry_to_str(entry: dict[str, Any]) -> str: def entry_to_str(entry: dict[str, Any]) -> str:
"""Serialise a disc entry dict -> stored string.""" """Serialise a disc entry dict -> stored string."""
ts = entry.get("ts", "") ts = entry.get("ts", "")
@@ -32,6 +35,7 @@ def entry_to_str(entry: dict[str, Any]) -> str:
if ts: if ts:
return f"@{ts}\n{role}:\n{content}" return f"@{ts}\n{role}:\n{content}"
return f"{role}:\n{content}" return f"{role}:\n{content}"
def str_to_entry(raw: str, roles: list[str]) -> dict[str, Any]: def str_to_entry(raw: str, roles: list[str]) -> dict[str, Any]:
"""Parse a stored string back to a disc entry dict.""" """Parse a stored string back to a disc entry dict."""
ts = "" ts = ""
@@ -56,7 +60,8 @@ def str_to_entry(raw: str, roles: list[str]) -> dict[str, Any]:
matched_role = next((r for r in known if r.lower() == raw_role.lower()), raw_role) matched_role = next((r for r in known if r.lower() == raw_role.lower()), raw_role)
content = parts[1].strip() if len(parts) > 1 else "" content = parts[1].strip() if len(parts) > 1 else ""
return {"role": matched_role, "content": content, "collapsed": False, "ts": ts} return {"role": matched_role, "content": content, "collapsed": False, "ts": ts}
# ── git helpers ────────────────────────────────────────────────────────────── # ── git helpers ──────────────────────────────────────────────────────────────
def get_git_commit(git_dir: str) -> str: def get_git_commit(git_dir: str) -> str:
try: try:
r = subprocess.run( r = subprocess.run(
@@ -66,6 +71,7 @@ def get_git_commit(git_dir: str) -> str:
return r.stdout.strip() if r.returncode == 0 else "" return r.stdout.strip() if r.returncode == 0 else ""
except Exception: except Exception:
return "" return ""
def get_git_log(git_dir: str, n: int = 5) -> str: def get_git_log(git_dir: str, n: int = 5) -> str:
try: try:
r = subprocess.run( r = subprocess.run(
@@ -75,9 +81,11 @@ def get_git_log(git_dir: str, n: int = 5) -> str:
return r.stdout.strip() if r.returncode == 0 else "" return r.stdout.strip() if r.returncode == 0 else ""
except Exception: except Exception:
return "" return ""
# ── default structures ─────────────────────────────────────────────────────── # ── default structures ───────────────────────────────────────────────────────
def default_discussion() -> dict[str, Any]: def default_discussion() -> dict[str, Any]:
return {"git_commit": "", "last_updated": now_ts(), "history": []} return {"git_commit": "", "last_updated": now_ts(), "history": []}
def default_project(name: str = "unnamed") -> dict[str, Any]: def default_project(name: str = "unnamed") -> dict[str, Any]:
return { return {
"project": {"name": name, "git_dir": "", "system_prompt": "", "main_context": ""}, "project": {"name": name, "git_dir": "", "system_prompt": "", "main_context": ""},
@@ -108,11 +116,13 @@ def default_project(name: str = "unnamed") -> dict[str, Any]:
"tracks": [] "tracks": []
} }
} }
# ── load / save ────────────────────────────────────────────────────────────── # ── load / save ──────────────────────────────────────────────────────────────
def get_history_path(project_path: Union[str, Path]) -> Path: def get_history_path(project_path: Union[str, Path]) -> Path:
"""Return the Path to the sibling history TOML file for a given project.""" """Return the Path to the sibling history TOML file for a given project."""
p = Path(project_path) p = Path(project_path)
return p.parent / f"{p.stem}_history.toml" return p.parent / f"{p.stem}_history.toml"
def load_project(path: Union[str, Path]) -> dict[str, Any]: def load_project(path: Union[str, Path]) -> dict[str, Any]:
""" """
Load a project TOML file. Load a project TOML file.
@@ -131,6 +141,7 @@ def load_project(path: Union[str, Path]) -> dict[str, Any]:
if hist_path.exists(): if hist_path.exists():
proj["discussion"] = load_history(path) proj["discussion"] = load_history(path)
return proj return proj
def load_history(project_path: Union[str, Path]) -> dict[str, Any]: def load_history(project_path: Union[str, Path]) -> dict[str, Any]:
"""Load the segregated discussion history from its dedicated TOML file.""" """Load the segregated discussion history from its dedicated TOML file."""
hist_path = get_history_path(project_path) hist_path = get_history_path(project_path)
@@ -138,6 +149,7 @@ def load_history(project_path: Union[str, Path]) -> dict[str, Any]:
with open(hist_path, "rb") as f: with open(hist_path, "rb") as f:
return tomllib.load(f) return tomllib.load(f)
return {} return {}
def clean_nones(data: Any) -> Any: def clean_nones(data: Any) -> Any:
"""Recursively remove None values from a dictionary/list.""" """Recursively remove None values from a dictionary/list."""
if isinstance(data, dict): if isinstance(data, dict):
@@ -145,6 +157,7 @@ def clean_nones(data: Any) -> Any:
elif isinstance(data, list): elif isinstance(data, list):
return [clean_nones(v) for v in data if v is not None] return [clean_nones(v) for v in data if v is not None]
return data return data
def save_project(proj: dict[str, Any], path: Union[str, Path], disc_data: Optional[dict[str, Any]] = None) -> None: def save_project(proj: dict[str, Any], path: Union[str, Path], disc_data: Optional[dict[str, Any]] = None) -> None:
""" """
Save the project TOML. Save the project TOML.
@@ -163,7 +176,8 @@ def save_project(proj: dict[str, Any], path: Union[str, Path], disc_data: Option
hist_path = get_history_path(path) hist_path = get_history_path(path)
with open(hist_path, "wb") as f: with open(hist_path, "wb") as f:
tomli_w.dump(disc_data, f) tomli_w.dump(disc_data, f)
# ── migration helper ───────────────────────────────────────────────────────── # ── migration helper ─────────────────────────────────────────────────────────
def migrate_from_legacy_config(cfg: dict[str, Any]) -> dict[str, Any]: def migrate_from_legacy_config(cfg: dict[str, Any]) -> dict[str, Any]:
"""Build a fresh project dict from a legacy flat config.toml. Does NOT save.""" """Build a fresh project dict from a legacy flat config.toml. Does NOT save."""
name = cfg.get("output", {}).get("namespace", "project") name = cfg.get("output", {}).get("namespace", "project")
@@ -177,7 +191,8 @@ def migrate_from_legacy_config(cfg: dict[str, Any]) -> dict[str, Any]:
main_disc["history"] = disc.get("history", []) main_disc["history"] = disc.get("history", [])
main_disc["last_updated"] = now_ts() main_disc["last_updated"] = now_ts()
return proj return proj
# ── flat config for aggregate.run() ───────────────────────────────────────── # ── flat config for aggregate.run() ─────────────────────────────────────────
def flat_config(proj: dict[str, Any], disc_name: Optional[str] = None, track_id: Optional[str] = None) -> dict[str, Any]: def flat_config(proj: dict[str, Any], disc_name: Optional[str] = None, track_id: Optional[str] = None) -> dict[str, Any]:
"""Return a flat config dict compatible with aggregate.run().""" """Return a flat config dict compatible with aggregate.run()."""
disc_sec = proj.get("discussion", {}) disc_sec = proj.get("discussion", {})
@@ -197,7 +212,8 @@ def flat_config(proj: dict[str, Any], disc_name: Optional[str] = None, track_id:
"history": history, "history": history,
}, },
} }
# ── track state persistence ───────────────────────────────────────────────── # ── track state persistence ─────────────────────────────────────────────────
def save_track_state(track_id: str, state: 'TrackState', base_dir: Union[str, Path] = ".") -> None: def save_track_state(track_id: str, state: 'TrackState', base_dir: Union[str, Path] = ".") -> None:
""" """
Saves a TrackState object to conductor/tracks/<track_id>/state.toml. Saves a TrackState object to conductor/tracks/<track_id>/state.toml.
@@ -208,6 +224,7 @@ def save_track_state(track_id: str, state: 'TrackState', base_dir: Union[str, Pa
data = clean_nones(state.to_dict()) data = clean_nones(state.to_dict())
with open(state_file, "wb") as f: with open(state_file, "wb") as f:
tomli_w.dump(data, f) tomli_w.dump(data, f)
def load_track_state(track_id: str, base_dir: Union[str, Path] = ".") -> Optional['TrackState']: def load_track_state(track_id: str, base_dir: Union[str, Path] = ".") -> Optional['TrackState']:
""" """
Loads a TrackState object from conductor/tracks/<track_id>/state.toml. Loads a TrackState object from conductor/tracks/<track_id>/state.toml.
@@ -219,6 +236,7 @@ def load_track_state(track_id: str, base_dir: Union[str, Path] = ".") -> Optiona
with open(state_file, "rb") as f: with open(state_file, "rb") as f:
data = tomllib.load(f) data = tomllib.load(f)
return TrackState.from_dict(data) return TrackState.from_dict(data)
def load_track_history(track_id: str, base_dir: Union[str, Path] = ".") -> list[str]: def load_track_history(track_id: str, base_dir: Union[str, Path] = ".") -> list[str]:
""" """
Loads the discussion history for a specific track from its state.toml. Loads the discussion history for a specific track from its state.toml.
@@ -236,6 +254,7 @@ def load_track_history(track_id: str, base_dir: Union[str, Path] = ".") -> list[
e["ts"] = ts.strftime(TS_FMT) e["ts"] = ts.strftime(TS_FMT)
history.append(entry_to_str(e)) history.append(entry_to_str(e))
return history return history
def save_track_history(track_id: str, history: list[str], base_dir: Union[str, Path] = ".") -> None: def save_track_history(track_id: str, history: list[str], base_dir: Union[str, Path] = ".") -> None:
""" """
Saves the discussion history for a specific track to its state.toml. Saves the discussion history for a specific track to its state.toml.
@@ -249,6 +268,7 @@ def save_track_history(track_id: str, history: list[str], base_dir: Union[str, P
entries = [str_to_entry(h, roles) for h in history] entries = [str_to_entry(h, roles) for h in history]
state.discussion = entries state.discussion = entries
save_track_state(track_id, state, base_dir) save_track_state(track_id, state, base_dir)
def get_all_tracks(base_dir: Union[str, Path] = ".") -> list[dict[str, Any]]: def get_all_tracks(base_dir: Union[str, Path] = ".") -> list[dict[str, Any]]:
""" """
Scans the conductor/tracks/ directory and returns a list of dictionaries Scans the conductor/tracks/ directory and returns a list of dictionaries

31
scan_report.txt Normal file
View File

@@ -0,0 +1,31 @@
Files with untyped items: 25
File NoRet Params Vars Total
-------------------------------------------------------------------------------------
./debug_ast.py 1 2 4 7
./tests/visual_mma_verification.py 0 0 4 4
./debug_ast_2.py 0 0 3 3
./scripts/cli_tool_bridge.py 1 0 1 2
./scripts/mcp_server.py 0 0 2 2
./tests/test_gui_diagnostics.py 0 0 2 2
./tests/test_gui_updates.py 0 0 2 2
./tests/test_layout_reorganization.py 0 0 2 2
./scripts/check_hints.py 0 0 1 1
./scripts/check_hints_v2.py 0 0 1 1
./scripts/claude_tool_bridge.py 0 0 1 1
./scripts/type_hint_scanner.py 1 0 0 1
./tests/mock_alias_tool.py 0 0 1 1
./tests/test_gemini_cli_adapter_parity.py 0 0 1 1
./tests/test_gui2_parity.py 0 0 1 1
./tests/test_gui2_performance.py 0 0 1 1
./tests/test_gui_performance_requirements.py 0 1 0 1
./tests/test_gui_stress_performance.py 0 1 0 1
./tests/test_hooks.py 0 1 0 1
./tests/test_live_workflow.py 0 1 0 1
./tests/test_track_state_persistence.py 0 1 0 1
./tests/verify_mma_gui_robust.py 0 0 1 1
./tests/visual_diag.py 0 0 1 1
./tests/visual_orchestration_verification.py 0 1 0 1
./tests/visual_sim_mma_v2.py 0 1 0 1
-------------------------------------------------------------------------------------
TOTAL 41

View File

@@ -29,6 +29,7 @@ def has_value_return(node: ast.AST) -> bool:
def collect_auto_none(tree: ast.Module) -> list[tuple[str, ast.AST]]: def collect_auto_none(tree: ast.Module) -> list[tuple[str, ast.AST]]:
"""Collect functions that can safely get -> None annotation.""" """Collect functions that can safely get -> None annotation."""
results = [] results = []
def scan(scope, prefix=""): def scan(scope, prefix=""):
for node in ast.iter_child_nodes(scope): for node in ast.iter_child_nodes(scope):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
@@ -111,11 +112,10 @@ def apply_return_none_single_pass(filepath: str) -> int:
with open(fp, 'w', encoding='utf-8', newline='') as f: with open(fp, 'w', encoding='utf-8', newline='') as f:
f.writelines(lines) f.writelines(lines)
return count return count
# --- Manual signature replacements ---
# --- Manual signature replacements --- # These use regex on the def line to do a targeted replacement.
# These use regex on the def line to do a targeted replacement. # Each entry: (dotted_name, old_params_pattern, new_full_sig_line)
# Each entry: (dotted_name, old_params_pattern, new_full_sig_line) # We match by finding the exact def line and replacing it.
# We match by finding the exact def line and replacing it.
def apply_manual_sigs(filepath: str, sig_replacements: list[tuple[str, str]]) -> int: def apply_manual_sigs(filepath: str, sig_replacements: list[tuple[str, str]]) -> int:
"""Apply manual signature replacements. """Apply manual signature replacements.
@@ -164,10 +164,9 @@ def verify_syntax(filepath: str) -> str:
return f"Syntax OK: {filepath}" return f"Syntax OK: {filepath}"
except SyntaxError as e: except SyntaxError as e:
return f"SyntaxError in {filepath} at line {e.lineno}: {e.msg}" return f"SyntaxError in {filepath} at line {e.lineno}: {e.msg}"
# ============================================================
# ============================================================ # gui_2.py manual signatures (Tier 3 items)
# gui_2.py manual signatures (Tier 3 items) # ============================================================
# ============================================================
GUI2_MANUAL_SIGS: list[tuple[str, str]] = [ GUI2_MANUAL_SIGS: list[tuple[str, str]] = [
(r'def resolve_pending_action\(self, action_id: str, approved: bool\):', (r'def resolve_pending_action\(self, action_id: str, approved: bool\):',
r'def resolve_pending_action(self, action_id: str, approved: bool) -> bool:'), r'def resolve_pending_action(self, action_id: str, approved: bool) -> bool:'),
@@ -281,7 +280,6 @@ if __name__ == "__main__":
n = apply_return_none_single_pass("gui_legacy.py") n = apply_return_none_single_pass("gui_legacy.py")
stats["auto_none"] += n stats["auto_none"] += n
print(f" gui_legacy.py: {n} applied") print(f" gui_legacy.py: {n} applied")
# Verify syntax after Phase A # Verify syntax after Phase A
for f in ["gui_2.py", "gui_legacy.py"]: for f in ["gui_2.py", "gui_legacy.py"]:
r = verify_syntax(f) r = verify_syntax(f)
@@ -289,7 +287,6 @@ if __name__ == "__main__":
print(f" ABORT: {r}") print(f" ABORT: {r}")
sys.exit(1) sys.exit(1)
print(" Syntax OK after Phase A") print(" Syntax OK after Phase A")
print("\n=== Phase B: Manual signatures (regex) ===") print("\n=== Phase B: Manual signatures (regex) ===")
n = apply_manual_sigs("gui_2.py", GUI2_MANUAL_SIGS) n = apply_manual_sigs("gui_2.py", GUI2_MANUAL_SIGS)
stats["manual_sig"] += n stats["manual_sig"] += n
@@ -297,7 +294,6 @@ if __name__ == "__main__":
n = apply_manual_sigs("gui_legacy.py", LEGACY_MANUAL_SIGS) n = apply_manual_sigs("gui_legacy.py", LEGACY_MANUAL_SIGS)
stats["manual_sig"] += n stats["manual_sig"] += n
print(f" gui_legacy.py: {n} applied") print(f" gui_legacy.py: {n} applied")
# Verify syntax after Phase B # Verify syntax after Phase B
for f in ["gui_2.py", "gui_legacy.py"]: for f in ["gui_2.py", "gui_legacy.py"]:
r = verify_syntax(f) r = verify_syntax(f)
@@ -305,9 +301,9 @@ if __name__ == "__main__":
print(f" ABORT: {r}") print(f" ABORT: {r}")
sys.exit(1) sys.exit(1)
print(" Syntax OK after Phase B") print(" Syntax OK after Phase B")
print("\n=== Phase C: Variable annotations (regex) ===") print("\n=== Phase C: Variable annotations (regex) ===")
# Use re.MULTILINE so ^ matches line starts # Use re.MULTILINE so ^ matches line starts
def apply_var_replacements_m(filepath, replacements): def apply_var_replacements_m(filepath, replacements):
fp = abs_path(filepath) fp = abs_path(filepath)
with open(fp, 'r', encoding='utf-8') as f: with open(fp, 'r', encoding='utf-8') as f:
@@ -323,14 +319,12 @@ if __name__ == "__main__":
with open(fp, 'w', encoding='utf-8', newline='') as f: with open(fp, 'w', encoding='utf-8', newline='') as f:
f.write(code) f.write(code)
return count return count
n = apply_var_replacements_m("gui_2.py", GUI2_VAR_REPLACEMENTS) n = apply_var_replacements_m("gui_2.py", GUI2_VAR_REPLACEMENTS)
stats["vars"] += n stats["vars"] += n
print(f" gui_2.py: {n} applied") print(f" gui_2.py: {n} applied")
n = apply_var_replacements_m("gui_legacy.py", LEGACY_VAR_REPLACEMENTS) n = apply_var_replacements_m("gui_legacy.py", LEGACY_VAR_REPLACEMENTS)
stats["vars"] += n stats["vars"] += n
print(f" gui_legacy.py: {n} applied") print(f" gui_legacy.py: {n} applied")
print("\n=== Final Syntax Verification ===") print("\n=== Final Syntax Verification ===")
all_ok = True all_ok = True
for f in ["gui_2.py", "gui_legacy.py"]: for f in ["gui_2.py", "gui_legacy.py"]:
@@ -338,7 +332,6 @@ if __name__ == "__main__":
print(f" {f}: {r}") print(f" {f}: {r}")
if "Error" in r: if "Error" in r:
all_ok = False all_ok = False
print(f"\n=== Summary ===") print(f"\n=== Summary ===")
print(f" Auto -> None: {stats['auto_none']}") print(f" Auto -> None: {stats['auto_none']}")
print(f" Manual sigs: {stats['manual_sig']}") print(f" Manual sigs: {stats['manual_sig']}")

View File

@@ -21,7 +21,6 @@ MODEL_MAP: dict[str, str] = {
'tier4': 'claude-haiku-4-5', 'tier4': 'claude-haiku-4-5',
} }
def generate_skeleton(code: str) -> str: def generate_skeleton(code: str) -> str:
""" """
Parses Python code and replaces function/method bodies with '...', Parses Python code and replaces function/method bodies with '...',
@@ -60,7 +59,6 @@ def generate_skeleton(code: str) -> str:
edits.append((start_byte, end_byte, "...")) edits.append((start_byte, end_byte, "..."))
for child in node.children: for child in node.children:
walk(child) walk(child)
walk(tree.root_node) walk(tree.root_node)
edits.sort(key=lambda x: x[0], reverse=True) edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8") code_bytes = bytearray(code, "utf8")
@@ -70,12 +68,10 @@ def generate_skeleton(code: str) -> str:
except Exception as e: except Exception as e:
return f"# Error generating skeleton: {e}\n{code}" return f"# Error generating skeleton: {e}\n{code}"
def get_model_for_role(role: str) -> str: def get_model_for_role(role: str) -> str:
"""Returns the Claude model to use for a given tier role.""" """Returns the Claude model to use for a given tier role."""
return MODEL_MAP.get(role, 'claude-haiku-4-5') return MODEL_MAP.get(role, 'claude-haiku-4-5')
def get_role_documents(role: str) -> list[str]: def get_role_documents(role: str) -> list[str]:
if role in ('tier1-orchestrator', 'tier1'): if role in ('tier1-orchestrator', 'tier1'):
return ['conductor/product.md', 'conductor/product-guidelines.md'] return ['conductor/product.md', 'conductor/product-guidelines.md']
@@ -85,7 +81,6 @@ def get_role_documents(role: str) -> list[str]:
return ['conductor/workflow.md'] return ['conductor/workflow.md']
return [] return []
def log_delegation(role: str, full_prompt: str, result: str | None = None, summary_prompt: str | None = None) -> str: def log_delegation(role: str, full_prompt: str, result: str | None = None, summary_prompt: str | None = None) -> str:
os.makedirs('logs/claude_agents', exist_ok=True) os.makedirs('logs/claude_agents', exist_ok=True)
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
@@ -106,7 +101,6 @@ def log_delegation(role: str, full_prompt: str, result: str | None = None, summa
f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n") f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n")
return log_file return log_file
def get_dependencies(filepath: str) -> list[str]: def get_dependencies(filepath: str) -> list[str]:
"""Identify top-level module imports from a Python file.""" """Identify top-level module imports from a Python file."""
try: try:
@@ -131,14 +125,11 @@ def get_dependencies(filepath: str) -> list[str]:
print(f"Error getting dependencies for {filepath}: {e}") print(f"Error getting dependencies for {filepath}: {e}")
return [] return []
def execute_agent(role: str, prompt: str, docs: list[str]) -> str: def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
model = get_model_for_role(role) model = get_model_for_role(role)
# Advanced Context: Dependency skeletons for Tier 3 # Advanced Context: Dependency skeletons for Tier 3
injected_context = "" injected_context = ""
UNFETTERED_MODULES: list[str] = ['mcp_client', 'project_manager', 'events', 'aggregate'] UNFETTERED_MODULES: list[str] = ['mcp_client', 'project_manager', 'events', 'aggregate']
if role in ['tier3', 'tier3-worker']: if role in ['tier3', 'tier3-worker']:
for doc in docs: for doc in docs:
if doc.endswith('.py') and os.path.exists(doc): if doc.endswith('.py') and os.path.exists(doc):
@@ -161,7 +152,6 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
print(f"Error gathering context for {dep_file}: {e}") print(f"Error gathering context for {dep_file}: {e}")
if len(injected_context) > 15000: if len(injected_context) > 15000:
injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]" injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]"
# MMA Protocol: Tier 3 and 4 are stateless. Build system directive. # MMA Protocol: Tier 3 and 4 are stateless. Build system directive.
if role in ['tier3', 'tier3-worker']: if role in ['tier3', 'tier3-worker']:
system_directive = ( system_directive = (
@@ -186,9 +176,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. "
"ONLY output the requested text. No pleasantries." "ONLY output the requested text. No pleasantries."
) )
command_text = f"{system_directive}\n\n{injected_context}\n\n" command_text = f"{system_directive}\n\n{injected_context}\n\n"
# Inline documents to ensure sub-agent has context in headless mode # Inline documents to ensure sub-agent has context in headless mode
for doc in docs: for doc in docs:
if os.path.exists(doc): if os.path.exists(doc):
@@ -198,9 +186,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n" command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n"
except Exception as e: except Exception as e:
print(f"Error inlining {doc}: {e}") print(f"Error inlining {doc}: {e}")
command_text += f"\n\nTASK: {prompt}\n\n" command_text += f"\n\nTASK: {prompt}\n\n"
# Spawn claude CLI non-interactively via PowerShell # Spawn claude CLI non-interactively via PowerShell
ps_command = ( ps_command = (
"if (Test-Path 'C:\\projects\\misc\\setup_claude.ps1') " "if (Test-Path 'C:\\projects\\misc\\setup_claude.ps1') "
@@ -208,7 +194,6 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
f"claude --model {model} --print" f"claude --model {model} --print"
) )
cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command] cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command]
try: try:
env = os.environ.copy() env = os.environ.copy()
env['CLAUDE_CLI_HOOK_CONTEXT'] = 'mma_headless' env['CLAUDE_CLI_HOOK_CONTEXT'] = 'mma_headless'
@@ -230,7 +215,6 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
log_delegation(role, command_text, err_msg) log_delegation(role, command_text, err_msg)
return err_msg return err_msg
def create_parser() -> argparse.ArgumentParser: def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Claude MMA Execution Script") parser = argparse.ArgumentParser(description="Claude MMA Execution Script")
parser.add_argument( parser.add_argument(
@@ -252,38 +236,31 @@ def create_parser() -> argparse.ArgumentParser:
) )
return parser return parser
def main() -> None: def main() -> None:
parser = create_parser() parser = create_parser()
args = parser.parse_args() args = parser.parse_args()
role = args.role role = args.role
prompt = args.prompt prompt = args.prompt
docs = [] docs = []
if args.task_file and os.path.exists(args.task_file): if args.task_file and os.path.exists(args.task_file):
with open(args.task_file, "rb") as f: with open(args.task_file, "rb") as f:
task_data = tomllib.load(f) task_data = tomllib.load(f)
role = task_data.get("role", role) role = task_data.get("role", role)
prompt = task_data.get("prompt", prompt) prompt = task_data.get("prompt", prompt)
docs = task_data.get("docs", []) docs = task_data.get("docs", [])
if not role or not prompt: if not role or not prompt:
parser.print_help() parser.print_help()
return return
if not docs: if not docs:
docs = get_role_documents(role) docs = get_role_documents(role)
# Extract @file references from the prompt # Extract @file references from the prompt
file_refs: list[str] = re.findall(r"@([\w./\\]+)", prompt) file_refs: list[str] = re.findall(r"@([\w./\\]+)", prompt)
for ref in file_refs: for ref in file_refs:
if os.path.exists(ref) and ref not in docs: if os.path.exists(ref) and ref not in docs:
docs.append(ref) docs.append(ref)
print(f"Executing role: {role} with docs: {docs}") print(f"Executing role: {role} with docs: {docs}")
result = execute_agent(role, prompt, docs) result = execute_agent(role, prompt, docs)
print(result) print(result)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -14,7 +14,6 @@ except ImportError:
print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr) print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr)
sys.exit(1) sys.exit(1)
def main() -> None: def main() -> None:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr) logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr)
logging.debug("Claude Tool Bridge script started.") logging.debug("Claude Tool Bridge script started.")
@@ -30,38 +29,30 @@ def main() -> None:
logging.error("Failed to decode JSON from stdin.") logging.error("Failed to decode JSON from stdin.")
print(json.dumps({"decision": "deny", "reason": "Invalid JSON received from stdin."})) print(json.dumps({"decision": "deny", "reason": "Invalid JSON received from stdin."}))
return return
# Claude Code PreToolUse hook format: tool_name + tool_input # Claude Code PreToolUse hook format: tool_name + tool_input
tool_name = hook_input.get('tool_name') tool_name = hook_input.get('tool_name')
tool_input = hook_input.get('tool_input', {}) tool_input = hook_input.get('tool_input', {})
if tool_name is None: if tool_name is None:
logging.error("Could not determine tool name from input. Expected 'tool_name'.") logging.error("Could not determine tool name from input. Expected 'tool_name'.")
print(json.dumps({"decision": "deny", "reason": "Missing 'tool_name' in hook input."})) print(json.dumps({"decision": "deny", "reason": "Missing 'tool_name' in hook input."}))
return return
if not isinstance(tool_input, dict): if not isinstance(tool_input, dict):
logging.warning(f"tool_input is not a dict: {tool_input}. Treating as empty.") logging.warning(f"tool_input is not a dict: {tool_input}. Treating as empty.")
tool_input = {} tool_input = {}
logging.debug(f"Resolved tool_name: '{tool_name}', tool_input: {tool_input}") logging.debug(f"Resolved tool_name: '{tool_name}', tool_input: {tool_input}")
# Check context — if not running via Manual Slop, pass through # Check context — if not running via Manual Slop, pass through
hook_context = os.environ.get("CLAUDE_CLI_HOOK_CONTEXT") hook_context = os.environ.get("CLAUDE_CLI_HOOK_CONTEXT")
logging.debug(f"Checking CLAUDE_CLI_HOOK_CONTEXT: '{hook_context}'") logging.debug(f"Checking CLAUDE_CLI_HOOK_CONTEXT: '{hook_context}'")
if hook_context == 'mma_headless': if hook_context == 'mma_headless':
# Sub-agents in headless MMA mode: auto-allow all tools # Sub-agents in headless MMA mode: auto-allow all tools
logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'mma_headless'. Allowing for sub-agent.") logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'mma_headless'. Allowing for sub-agent.")
print(json.dumps({"decision": "allow", "reason": "Sub-agent headless mode (MMA)."})) print(json.dumps({"decision": "allow", "reason": "Sub-agent headless mode (MMA)."}))
return return
if hook_context != 'manual_slop': if hook_context != 'manual_slop':
# Not a programmatic Manual Slop session — allow through silently # Not a programmatic Manual Slop session — allow through silently
logging.debug(f"CLAUDE_CLI_HOOK_CONTEXT is '{hook_context}', not 'manual_slop'. Allowing.") logging.debug(f"CLAUDE_CLI_HOOK_CONTEXT is '{hook_context}', not 'manual_slop'. Allowing.")
print(json.dumps({"decision": "allow", "reason": f"Non-programmatic usage (CLAUDE_CLI_HOOK_CONTEXT={hook_context})."})) print(json.dumps({"decision": "allow", "reason": f"Non-programmatic usage (CLAUDE_CLI_HOOK_CONTEXT={hook_context})."}))
return return
# manual_slop context: route to GUI for approval # manual_slop context: route to GUI for approval
logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'manual_slop'. Routing to API Hook Client.") logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'manual_slop'. Routing to API Hook Client.")
client = ApiHookClient(base_url="http://127.0.0.1:8999") client = ApiHookClient(base_url="http://127.0.0.1:8999")
@@ -78,11 +69,9 @@ def main() -> None:
except Exception as e: except Exception as e:
logging.error(f"API Hook Client error: {str(e)}", exc_info=True) logging.error(f"API Hook Client error: {str(e)}", exc_info=True)
print(json.dumps({"decision": "deny", "reason": f"Manual Slop hook server unreachable: {str(e)}"})) print(json.dumps({"decision": "deny", "reason": f"Manual Slop hook server unreachable: {str(e)}"}))
except Exception as e: except Exception as e:
logging.error(f"Unexpected error in bridge: {str(e)}", exc_info=True) logging.error(f"Unexpected error in bridge: {str(e)}", exc_info=True)
print(json.dumps({"decision": "deny", "reason": f"Internal bridge error: {str(e)}"})) print(json.dumps({"decision": "deny", "reason": f"Internal bridge error: {str(e)}"}))
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -3,12 +3,10 @@ import re
with open('mcp_client.py', 'r', encoding='utf-8') as f: with open('mcp_client.py', 'r', encoding='utf-8') as f:
content: str = f.read() content: str = f.read()
# 1. Add import os if not there
# 1. Add import os if not there
if 'import os' not in content: if 'import os' not in content:
content: str = content.replace('import summarize', 'import os\nimport summarize') content: str = content.replace('import summarize', 'import os\nimport summarize')
# 2. Add the functions before "# ------------------------------------------------------------------ web tools"
# 2. Add the functions before "# ------------------------------------------------------------------ web tools"
functions_code: str = r''' functions_code: str = r'''
def py_find_usages(path: str, name: str) -> str: def py_find_usages(path: str, name: str) -> str:
"""Finds exact string matches of a symbol in a given file or directory.""" """Finds exact string matches of a symbol in a given file or directory."""
@@ -187,8 +185,7 @@ if old_tool_names_match:
old_names: str = old_tool_names_match.group(1) old_names: str = old_tool_names_match.group(1)
new_names: str = old_names + ', "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"' new_names: str = old_names + ', "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"'
content: str = content.replace(old_tool_names_match.group(0), f'TOOL_NAMES = {{{new_names}}}') content: str = content.replace(old_tool_names_match.group(0), f'TOOL_NAMES = {{{new_names}}}')
# 4. Update dispatch
# 4. Update dispatch
dispatch_additions: str = r''' dispatch_additions: str = r'''
if tool_name == "py_find_usages": if tool_name == "py_find_usages":
return py_find_usages(tool_input.get("path", ""), tool_input.get("name", "")) return py_find_usages(tool_input.get("path", ""), tool_input.get("name", ""))
@@ -205,7 +202,7 @@ dispatch_additions: str = r'''
return f"ERROR: unknown MCP tool '{tool_name}'" return f"ERROR: unknown MCP tool '{tool_name}'"
''' '''
content: str = re.sub( content: str = re.sub(
r' return f"ERROR: unknown MCP tool \'{tool_name}\'"', dispatch_additions.strip(), content) r' return f"ERROR: unknown MCP tool \'{tool_name}\'"', dispatch_additions.strip(), content)
# 5. Update MCP_TOOL_SPECS # 5. Update MCP_TOOL_SPECS
mcp_tool_specs_addition: str = r''' mcp_tool_specs_addition: str = r'''
@@ -283,7 +280,7 @@ mcp_tool_specs_addition: str = r'''
''' '''
content: str = re.sub( content: str = re.sub(
r'\]\s*$', mcp_tool_specs_addition.strip(), content) r'\]\s*$', mcp_tool_specs_addition.strip(), content)
with open('mcp_client.py', 'w', encoding='utf-8') as f: with open('mcp_client.py', 'w', encoding='utf-8') as f:
f.write(content) f.write(content)

View File

@@ -46,7 +46,6 @@ RUN_POWERSHELL_SPEC = {
server = Server("manual-slop-tools") server = Server("manual-slop-tools")
@server.list_tools() @server.list_tools()
async def list_tools() -> list[Tool]: async def list_tools() -> list[Tool]:
tools = [] tools = []
@@ -64,7 +63,6 @@ async def list_tools() -> list[Tool]:
)) ))
return tools return tools
@server.call_tool() @server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]: async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try: try:
@@ -77,9 +75,8 @@ async def call_tool(name: str, arguments: dict) -> list[TextContent]:
except Exception as e: except Exception as e:
return [TextContent(type="text", text=f"ERROR: {e}")] return [TextContent(type="text", text=f"ERROR: {e}")]
async def main() -> None: async def main() -> None:
# Configure mcp_client with the project root so py_* tools are not ACCESS DENIED # Configure mcp_client with the project root so py_* tools are not ACCESS DENIED
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
mcp_client.configure([], extra_base_dirs=[project_root]) mcp_client.configure([], extra_base_dirs=[project_root])
async with stdio_server() as (read_stream, write_stream): async with stdio_server() as (read_stream, write_stream):
@@ -89,6 +86,5 @@ async def main() -> None:
server.create_initialization_options(), server.create_initialization_options(),
) )
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

View File

@@ -18,7 +18,9 @@ for root, dirs, files in os.walk('.'):
except Exception: except Exception:
continue continue
counts: list[int] = [0, 0, 0] # nr, up, uv counts: list[int] = [0, 0, 0] # nr, up, uv
def scan(scope: ast.AST, prefix: str = '') -> None: def scan(scope: ast.AST, prefix: str = '') -> None:
# Iterate top-level nodes in this scope
for node in ast.iter_child_nodes(scope): for node in ast.iter_child_nodes(scope):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
if node.returns is None: if node.returns is None:
@@ -26,11 +28,11 @@ for root, dirs, files in os.walk('.'):
for arg in node.args.args: for arg in node.args.args:
if arg.arg not in ('self', 'cls') and arg.annotation is None: if arg.arg not in ('self', 'cls') and arg.annotation is None:
counts[1] += 1 counts[1] += 1
if isinstance(node, ast.Assign): elif isinstance(node, ast.Assign):
for t in node.targets: for t in node.targets:
if isinstance(t, ast.Name): if isinstance(t, ast.Name):
counts[2] += 1 counts[2] += 1
if isinstance(node, ast.ClassDef): elif isinstance(node, ast.ClassDef):
scan(node, prefix=f'{node.name}.') scan(node, prefix=f'{node.name}.')
scan(tree) scan(tree)
nr, up, uv = counts nr, up, uv = counts

View File

@@ -1,4 +1,4 @@
import sys import sys
import json import json
import os import os
import io import io
@@ -39,6 +39,7 @@ def main() -> None:
result = shell_runner.run_powershell(script, os.getcwd()) result = shell_runner.run_powershell(script, os.getcwd())
else: else:
# mcp_client tools generally resolve paths relative to CWD if not configured. # mcp_client tools generally resolve paths relative to CWD if not configured.
mcp_client.configure([], [os.getcwd()])
result = mcp_client.dispatch(tool_name, tool_input) result = mcp_client.dispatch(tool_name, tool_input)
# We print the raw result string as that's what gemini-cli expects. # We print the raw result string as that's what gemini-cli expects.
print(result) print(result)
@@ -48,3 +49,4 @@ def main() -> None:
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -30,8 +30,10 @@ _comms_fh: Optional[TextIO] = None # file handle: logs/<session_id>/comms.log
_tool_fh: Optional[TextIO] = None # file handle: logs/<session_id>/toolcalls.log _tool_fh: Optional[TextIO] = None # file handle: logs/<session_id>/toolcalls.log
_api_fh: Optional[TextIO] = None # file handle: logs/<session_id>/apihooks.log _api_fh: Optional[TextIO] = None # file handle: logs/<session_id>/apihooks.log
_cli_fh: Optional[TextIO] = None # file handle: logs/<session_id>/clicalls.log _cli_fh: Optional[TextIO] = None # file handle: logs/<session_id>/clicalls.log
def _now_ts() -> str: def _now_ts() -> str:
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def open_session(label: Optional[str] = None) -> None: def open_session(label: Optional[str] = None) -> None:
""" """
Called once at GUI startup. Creates the log directories if needed and Called once at GUI startup. Creates the log directories if needed and
@@ -64,6 +66,7 @@ def open_session(label: Optional[str] = None) -> None:
except Exception as e: except Exception as e:
print(f"Warning: Could not register session in LogRegistry: {e}") print(f"Warning: Could not register session in LogRegistry: {e}")
atexit.register(close_session) atexit.register(close_session)
def close_session() -> None: def close_session() -> None:
"""Flush and close all log files. Called on clean exit.""" """Flush and close all log files. Called on clean exit."""
global _comms_fh, _tool_fh, _api_fh, _cli_fh, _session_id, _LOG_DIR global _comms_fh, _tool_fh, _api_fh, _cli_fh, _session_id, _LOG_DIR
@@ -87,6 +90,7 @@ def close_session() -> None:
registry.update_auto_whitelist_status(_session_id) registry.update_auto_whitelist_status(_session_id)
except Exception as e: except Exception as e:
print(f"Warning: Could not update auto-whitelist on close: {e}") print(f"Warning: Could not update auto-whitelist on close: {e}")
def log_api_hook(method: str, path: str, payload: str) -> None: def log_api_hook(method: str, path: str, payload: str) -> None:
"""Log an API hook invocation.""" """Log an API hook invocation."""
if _api_fh is None: if _api_fh is None:
@@ -97,6 +101,7 @@ def log_api_hook(method: str, path: str, payload: str) -> None:
_api_fh.flush() _api_fh.flush()
except Exception: except Exception:
pass pass
def log_comms(entry: dict[str, Any]) -> None: def log_comms(entry: dict[str, Any]) -> None:
""" """
Append one comms entry to the comms log file as a JSON-L line. Append one comms entry to the comms log file as a JSON-L line.
@@ -108,6 +113,7 @@ def log_comms(entry: dict[str, Any]) -> None:
_comms_fh.write(json.dumps(entry, ensure_ascii=False, default=str) + "\n") _comms_fh.write(json.dumps(entry, ensure_ascii=False, default=str) + "\n")
except Exception: except Exception:
pass pass
def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optional[str]: def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optional[str]:
""" """
Append a tool-call record to the toolcalls log and write the PS1 script to Append a tool-call record to the toolcalls log and write the PS1 script to
@@ -139,6 +145,7 @@ def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optio
except Exception: except Exception:
pass pass
return str(ps1_path) if ps1_path else None return str(ps1_path) if ps1_path else None
def log_cli_call(command: str, stdin_content: Optional[str], stdout_content: Optional[str], stderr_content: Optional[str], latency: float) -> None: def log_cli_call(command: str, stdin_content: Optional[str], stdout_content: Optional[str], stderr_content: Optional[str], latency: float) -> None:
"""Log details of a CLI subprocess execution.""" """Log details of a CLI subprocess execution."""
if _cli_fh is None: if _cli_fh is None:

View File

@@ -4,12 +4,11 @@ if '"role": "tool"' in prompt:
print(json.dumps({"type": "message", "role": "assistant", "content": "Tool worked!"}), flush=True) print(json.dumps({"type": "message", "role": "assistant", "content": "Tool worked!"}), flush=True)
print(json.dumps({"type": "result", "stats": {"total_tokens": 20}}), flush=True) print(json.dumps({"type": "result", "stats": {"total_tokens": 20}}), flush=True)
else: else:
# We must call the bridge to trigger the GUI approval! # We must call the bridge to trigger the GUI approval!
tool_call = {"name": "list_directory", "input": {"dir_path": "."}} tool_call = {"name": "list_directory", "input": {"dir_path": "."}}
bridge_cmd = [sys.executable, "C:/projects/manual_slop/scripts/cli_tool_bridge.py"] bridge_cmd = [sys.executable, "C:/projects/manual_slop/scripts/cli_tool_bridge.py"]
proc = subprocess.Popen(bridge_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) proc = subprocess.Popen(bridge_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
stdout, _ = proc.communicate(input=json.dumps(tool_call)) stdout, _ = proc.communicate(input=json.dumps(tool_call))
# Even if bridge says allow, we emit the tool_use to the adapter # Even if bridge says allow, we emit the tool_use to the adapter
print(json.dumps({"type": "message", "role": "assistant", "content": "I will list the directory."}), flush=True) print(json.dumps({"type": "message", "role": "assistant", "content": "I will list the directory."}), flush=True)
print(json.dumps({ print(json.dumps({

View File

@@ -10,5 +10,5 @@ auto_add = true
[discussions.main] [discussions.main]
git_commit = "" git_commit = ""
last_updated = "2026-02-28T07:35:49" last_updated = "2026-02-28T19:35:01"
history = [] history = []

View File

@@ -7,6 +7,5 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import ai_client import ai_client
def test_agent_capabilities_listing(): def test_agent_capabilities_listing() -> None:
# Verify that the agent exposes its available tools correctly
pass pass

View File

@@ -26,7 +26,7 @@ class MockCandidate:
self.finish_reason.name = "STOP" self.finish_reason.name = "STOP"
def test_ai_client_event_emitter_exists() -> None: def test_ai_client_event_emitter_exists() -> None:
# This should fail initially because 'events' won't exist on ai_client # This should fail initially because 'events' won't exist on ai_client
assert hasattr(ai_client, 'events') assert hasattr(ai_client, 'events')
def test_event_emission() -> None: def test_event_emission() -> None:

View File

@@ -40,8 +40,7 @@ def test_has_cycle_indirect_cycle() -> None:
dag = TrackDAG([t1, t2, t3]) dag = TrackDAG([t1, t2, t3])
assert dag.has_cycle() assert dag.has_cycle()
def test_has_cycle_complex_no_cycle(): def test_has_cycle_complex_no_cycle() -> None:
# T1 -> T2, T1 -> T3, T2 -> T4, T3 -> T4
t1 = Ticket(id="T1", description="T1", status="todo", assigned_to="worker", depends_on=["T2", "T3"]) t1 = Ticket(id="T1", description="T1", status="todo", assigned_to="worker", depends_on=["T2", "T3"])
t2 = Ticket(id="T2", description="T2", status="todo", assigned_to="worker", depends_on=["T4"]) t2 = Ticket(id="T2", description="T2", status="todo", assigned_to="worker", depends_on=["T4"])
t3 = Ticket(id="T3", description="T3", status="todo", assigned_to="worker", depends_on=["T4"]) t3 = Ticket(id="T3", description="T3", status="todo", assigned_to="worker", depends_on=["T4"])

View File

@@ -2,8 +2,7 @@ import pytest
from models import Ticket from models import Ticket
from dag_engine import TrackDAG, ExecutionEngine from dag_engine import TrackDAG, ExecutionEngine
def test_execution_engine_basic_flow(): def test_execution_engine_basic_flow() -> None:
# Setup tickets with dependencies
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker") t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker")
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker", depends_on=["T1"]) t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker", depends_on=["T1"])
t3 = Ticket(id="T3", description="Task 3", status="todo", assigned_to="worker", depends_on=["T1"]) t3 = Ticket(id="T3", description="Task 3", status="todo", assigned_to="worker", depends_on=["T1"])

View File

@@ -1,3 +1,4 @@
from typing import Any
import pytest import pytest
from unittest.mock import MagicMock, patch, call from unittest.mock import MagicMock, patch, call
from models import Ticket, Track, WorkerContext from models import Ticket, Track, WorkerContext
@@ -33,7 +34,7 @@ async def test_headless_verification_full_run() -> None:
assert mock_reset.call_count == 2 assert mock_reset.call_count == 2
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_headless_verification_error_and_qa_interceptor(): async def test_headless_verification_error_and_qa_interceptor() -> None:
""" """
5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered 5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered
and its summary is injected into the worker's history for the next retry. and its summary is injected into the worker's history for the next retry.
@@ -54,7 +55,7 @@ async def test_headless_verification_error_and_qa_interceptor():
# Ensure _gemini_client is restored by the mock ensure function # Ensure _gemini_client is restored by the mock ensure function
import ai_client import ai_client
def restore_client(): def restore_client() -> None:
ai_client._gemini_client = mock_genai_client ai_client._gemini_client = mock_genai_client
mock_ensure.side_effect = restore_client mock_ensure.side_effect = restore_client
ai_client._gemini_client = mock_genai_client ai_client._gemini_client = mock_genai_client
@@ -86,7 +87,7 @@ async def test_headless_verification_error_and_qa_interceptor():
mock_chat.send_message.side_effect = [mock_resp1, mock_resp2] mock_chat.send_message.side_effect = [mock_resp1, mock_resp2]
# Mock run_powershell behavior: it should call the qa_callback on error # Mock run_powershell behavior: it should call the qa_callback on error
def run_side_effect(script, base_dir, qa_callback): def run_side_effect(script: Any, base_dir: Any, qa_callback: Any) -> Any:
if qa_callback: if qa_callback:
analysis = qa_callback("Error: file not found") analysis = qa_callback("Error: file not found")
return f"""STDERR: Error: file not found return f"""STDERR: Error: file not found
@@ -117,3 +118,4 @@ QA ANALYSIS:
if "QA ANALYSIS:" in part_str and "FIX: Check if path exists." in part_str: if "QA ANALYSIS:" in part_str and "FIX: Check if path exists." in part_str:
found_qa = True found_qa = True
assert found_qa, "QA Analysis was not injected into the next round" assert found_qa, "QA Analysis was not injected into the next round"

View File

@@ -1,4 +1,4 @@
import pytest import pytest
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import os import os
from pathlib import Path from pathlib import Path
@@ -7,7 +7,6 @@ from pathlib import Path
import gui_2 import gui_2
from gui_2 import App from gui_2 import App
@pytest.fixture
@pytest.fixture @pytest.fixture
def mock_config(tmp_path: Path) -> Path: def mock_config(tmp_path: Path) -> Path:
config_path = tmp_path / "config.toml" config_path = tmp_path / "config.toml"
@@ -20,7 +19,6 @@ model = "model"
""", encoding="utf-8") """, encoding="utf-8")
return config_path return config_path
@pytest.fixture
@pytest.fixture @pytest.fixture
def mock_project(tmp_path: Path) -> Path: def mock_project(tmp_path: Path) -> Path:
project_path = tmp_path / "project.toml" project_path = tmp_path / "project.toml"
@@ -34,7 +32,6 @@ history = []
""", encoding="utf-8") """, encoding="utf-8")
return project_path return project_path
@pytest.fixture
@pytest.fixture @pytest.fixture
def app_instance(mock_config: Path, mock_project: Path, monkeypatch: pytest.MonkeyPatch) -> App: def app_instance(mock_config: Path, mock_project: Path, monkeypatch: pytest.MonkeyPatch) -> App:
monkeypatch.setattr("gui_2.CONFIG_PATH", mock_config) monkeypatch.setattr("gui_2.CONFIG_PATH", mock_config)
@@ -95,3 +92,4 @@ def test_render_log_management_logic(app_instance: App) -> None:
mock_begin.assert_called_with("Log Management", app.show_windows["Log Management"]) mock_begin.assert_called_with("Log Management", app.show_windows["Log Management"])
mock_begin_table.assert_called() mock_begin_table.assert_called()
mock_text.assert_any_call("session_1") mock_text.assert_any_call("session_1")

View File

@@ -8,8 +8,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import mcp_client import mcp_client
def test_mcp_perf_tool_retrieval(): def test_mcp_perf_tool_retrieval() -> None:
# Test that the MCP tool can call performance_monitor metrics
mock_metrics = {"fps": 60, "last_frame_time_ms": 16.6} mock_metrics = {"fps": 60, "last_frame_time_ms": 16.6}
# Simulate tool call by patching the callback # Simulate tool call by patching the callback
with patch('mcp_client.perf_monitor_callback', return_value=mock_metrics): with patch('mcp_client.perf_monitor_callback', return_value=mock_metrics):

View File

@@ -10,7 +10,7 @@ import session_logger
@pytest.fixture @pytest.fixture
def temp_logs(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[Path, None, None]: def temp_logs(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[Path, None, None]:
# Ensure closed before starting # Ensure closed before starting
session_logger.close_session() session_logger.close_session()
monkeypatch.setattr(session_logger, "_comms_fh", None) monkeypatch.setattr(session_logger, "_comms_fh", None)
# Mock _LOG_DIR in session_logger # Mock _LOG_DIR in session_logger

View File

@@ -8,7 +8,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from simulation.sim_ai_settings import AISettingsSimulation from simulation.sim_ai_settings import AISettingsSimulation
def test_ai_settings_simulation_run(): def test_ai_settings_simulation_run() -> None:
mock_client = MagicMock() mock_client = MagicMock()
mock_client.wait_for_server.return_value = True mock_client.wait_for_server.return_value = True
mock_client.get_value.side_effect = lambda key: { mock_client.get_value.side_effect = lambda key: {

View File

@@ -8,7 +8,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from simulation.sim_execution import ExecutionSimulation from simulation.sim_execution import ExecutionSimulation
def test_execution_simulation_run(): def test_execution_simulation_run() -> None:
mock_client = MagicMock() mock_client = MagicMock()
mock_client.wait_for_server.return_value = True mock_client.wait_for_server.return_value = True
# Mock show_confirm_modal state # Mock show_confirm_modal state

View File

@@ -13,7 +13,7 @@ def test_api_ask_client_method(live_gui) -> None:
client.get_events() client.get_events()
results = {"response": None, "error": None} results = {"response": None, "error": None}
def make_blocking_request() -> None: def make_blocking_request() -> None:
try: try:
# This call should block until we respond # This call should block until we respond
results["response"] = client.request_confirmation( results["response"] = client.request_confirmation(

0
worker_debug.log Normal file
View File