From 3a2856b27d344d3509634130af7a7b9c84518099 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Wed, 25 Feb 2026 23:11:42 -0500 Subject: [PATCH] pain --- scripts/mma_exec.py | 88 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 71 insertions(+), 17 deletions(-) diff --git a/scripts/mma_exec.py b/scripts/mma_exec.py index ac101ac..c94c70c 100644 --- a/scripts/mma_exec.py +++ b/scripts/mma_exec.py @@ -2,6 +2,7 @@ import argparse import subprocess import json import os +import tomllib import tree_sitter import tree_sitter_python import ast @@ -122,7 +123,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str: injected_context = "" # Whitelist of modules that sub-agents have "unfettered" (full) access to. # These will be provided in full if imported, instead of just skeletons. - UNFETTERED_MODULES = ['mcp_client', 'project_manager'] + UNFETTERED_MODULES = ['mcp_client', 'project_manager', 'events', 'aggregate'] if role in ['tier3', 'tier3-worker']: for doc in docs: @@ -131,6 +132,12 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str: for dep in deps: # Only try to generate skeletons for files that exist in the local dir dep_file = f"{dep}.py" + + # Optimization: If the dependency is already in 'docs' (explicitly provided), + # do NOT inject its skeleton/full context again as a dependency. + if dep_file in docs: + continue + if os.path.exists(dep_file) and dep_file != doc: try: if dep in UNFETTERED_MODULES: @@ -148,19 +155,45 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str: if len(injected_context) > 15000: injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]" - # MMA Protocol: Tier 3 and 4 are stateless and tool-less. - system_directive = f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " \ - "DO NOT USE ANY TOOLS (no write_file, no run_shell_command, etc.). " \ - "ONLY output the requested text, code, or diff. No pleasantries." + # MMA Protocol: Tier 3 and 4 are stateless. + if role in ['tier3', 'tier3-worker']: + system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). " \ + "Your goal is to generate high-quality code or diffs based on the provided task. " \ + "DO NOT USE ANY TOOLS (no write_file, no run_shell_command, etc.). " \ + "ONLY output the clean code or the requested diff. No pleasantries, no conversational filler." + elif role in ['tier4', 'tier4-qa']: + system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. " \ + "Your goal is to analyze errors, summarize logs, or verify tests. " \ + "DO NOT USE ANY TOOLS. ONLY output the requested analysis. No pleasantries." + else: + system_directive = f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " \ + "DO NOT USE ANY TOOLS. ONLY output the requested text. No pleasantries." - command_text = f"{system_directive}\n\nUse the mma-{role} skill. {injected_context}{prompt}" - for doc in docs: - command_text += f" @{doc}" + command_text = f"{system_directive}\n\n{injected_context}\n\n" - # Use --approval-mode plan to ensure the agent is in read-only mode. - cmd = ['gemini', '-p', command_text, '--output-format', 'json', '--model', model, '--approval-mode', 'plan'] + # Manually inline documents to ensure sub-agent has context in headless mode + for doc in docs: + if os.path.exists(doc): + try: + with open(doc, 'r', encoding='utf-8') as f: + content = f.read() + command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n" + except Exception as e: + print(f"Error inlining {doc}: {e}") + + command_text += f"\n\nTASK: {prompt}\n\n" + + # Use subprocess with input to pipe the prompt via stdin, avoiding WinError 206. + # We use -p 'task' to ensure non-interactive (headless) mode and valid parsing. + ps_command = ( + f"if (Test-Path 'C:\\projects\\misc\\setup_gemini.ps1') {{ . 'C:\\projects\\misc\\setup_gemini.ps1' }}; " + f"gemini -p 'task' --output-format json --model {model}" + ) + cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command] + try: - process = subprocess.run(cmd, capture_output=True, text=True, shell=True) + process = subprocess.run(cmd, input=command_text, capture_output=True, text=True, encoding='utf-8') + if not process.stdout and process.stderr: return f"Error: {process.stderr}" @@ -182,13 +215,18 @@ def create_parser(): parser.add_argument( "--role", choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'], - required=True, help="The tier role to execute" ) + parser.add_argument( + "--task-file", + type=str, + help="TOML file defining the task" + ) parser.add_argument( "prompt", type=str, - help="The prompt for the tier" + nargs='?', + help="The prompt for the tier (optional if --task-file is used)" ) return parser @@ -196,17 +234,33 @@ def main(): parser = create_parser() args = parser.parse_args() - docs = get_role_documents(args.role) + role = args.role + prompt = args.prompt + docs = [] + + if args.task_file and os.path.exists(args.task_file): + with open(args.task_file, "rb") as f: + task_data = tomllib.load(f) + role = task_data.get("role", role) + prompt = task_data.get("prompt", prompt) + docs = task_data.get("docs", []) + + if not role or not prompt: + parser.print_help() + return + + if not docs: + docs = get_role_documents(role) # Extract @file references from the prompt import re - file_refs = re.findall(r"@([\w./\\]+)", args.prompt) + file_refs = re.findall(r"@([\w./\\]+)", prompt) for ref in file_refs: if os.path.exists(ref) and ref not in docs: docs.append(ref) - print(f"Executing role: {args.role} with docs: {docs}") - result = execute_agent(args.role, args.prompt, docs) + print(f"Executing role: {role} with docs: {docs}") + result = execute_agent(role, prompt, docs) print(result) if __name__ == "__main__":