import argparse import subprocess import os import ast import datetime import re import tomllib import tree_sitter import tree_sitter_python LOG_FILE: str = 'logs/claude_mma_delegation.log' MODEL_MAP: dict[str, str] = { 'tier1-orchestrator': 'claude-opus-4-6', 'tier1': 'claude-opus-4-6', 'tier2-tech-lead': 'claude-sonnet-4-6', 'tier2': 'claude-sonnet-4-6', 'tier3-worker': 'claude-sonnet-4-6', 'tier3': 'claude-sonnet-4-6', 'tier4-qa': 'claude-haiku-4-5', 'tier4': 'claude-haiku-4-5', } def generate_skeleton(code: str) -> str: """ Parses Python code and replaces function/method bodies with '...', preserving docstrings if present. """ try: PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language()) parser = tree_sitter.Parser(PY_LANGUAGE) tree = parser.parse(bytes(code, "utf8")) edits = [] def is_docstring(node): if node.type == "expression_statement" and node.child_count > 0: if node.children[0].type == "string": return True return False def walk(node): if node.type == "function_definition": body = node.child_by_field_name("body") if body and body.type == "block": indent = " " * body.start_point.column first_stmt = None for child in body.children: if child.type != "comment": first_stmt = child break if first_stmt and is_docstring(first_stmt): start_byte = first_stmt.end_byte end_byte = body.end_byte if end_byte > start_byte: edits.append((start_byte, end_byte, f"\n{indent}...")) else: start_byte = body.start_byte end_byte = body.end_byte edits.append((start_byte, end_byte, "...")) for child in node.children: walk(child) walk(tree.root_node) edits.sort(key=lambda x: x[0], reverse=True) code_bytes = bytearray(code, "utf8") for start, end, replacement in edits: code_bytes[start:end] = bytes(replacement, "utf8") return code_bytes.decode("utf8") except Exception as e: return f"# Error generating skeleton: {e}\n{code}" def get_model_for_role(role: str) -> str: """Returns the Claude model to use for a given tier role.""" return MODEL_MAP.get(role, 'claude-haiku-4-5') def get_role_documents(role: str) -> list[str]: if role in ('tier1-orchestrator', 'tier1'): return ['conductor/product.md', 'conductor/product-guidelines.md'] elif role in ('tier2-tech-lead', 'tier2'): return ['conductor/tech-stack.md', 'conductor/workflow.md'] elif role in ('tier3-worker', 'tier3'): return ['conductor/workflow.md'] return [] def log_delegation(role: str, full_prompt: str, result: str | None = None, summary_prompt: str | None = None) -> str: os.makedirs('logs/claude_agents', exist_ok=True) timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') log_file = f'logs/claude_agents/claude_{role}_task_{timestamp}.log' with open(log_file, 'w', encoding='utf-8') as f: f.write("==================================================\n") f.write(f"ROLE: {role}\n") f.write(f"TIMESTAMP: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("--------------------------------------------------\n") f.write(f"FULL PROMPT:\n{full_prompt}\n") f.write("--------------------------------------------------\n") if result: f.write(f"RESULT:\n{result}\n") f.write("==================================================\n") os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) display_prompt = summary_prompt if summary_prompt else full_prompt with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n") return log_file def get_dependencies(filepath: str) -> list[str]: """Identify top-level module imports from a Python file.""" try: with open(filepath, 'r', encoding='utf-8') as f: tree = ast.parse(f.read()) dependencies = [] for node in tree.body: if isinstance(node, ast.Import): for alias in node.names: dependencies.append(alias.name.split('.')[0]) elif isinstance(node, ast.ImportFrom): if node.module: dependencies.append(node.module.split('.')[0]) seen = set() result = [] for d in dependencies: if d not in seen: result.append(d) seen.add(d) return result except Exception as e: print(f"Error getting dependencies for {filepath}: {e}") return [] def execute_agent(role: str, prompt: str, docs: list[str]) -> str: model = get_model_for_role(role) # Advanced Context: Dependency skeletons for Tier 3 injected_context = "" UNFETTERED_MODULES: list[str] = ['mcp_client', 'project_manager', 'events', 'aggregate'] if role in ['tier3', 'tier3-worker']: for doc in docs: if doc.endswith('.py') and os.path.exists(doc): deps = get_dependencies(doc) for dep in deps: dep_file = f"{dep}.py" if dep_file in docs: continue if os.path.exists(dep_file) and dep_file != doc: try: if dep in UNFETTERED_MODULES: with open(dep_file, 'r', encoding='utf-8') as f: full_content = f.read() injected_context += f"\n\nFULL MODULE CONTEXT: {dep_file}\n{full_content}\n" else: with open(dep_file, 'r', encoding='utf-8') as f: skeleton = generate_skeleton(f.read()) injected_context += f"\n\nDEPENDENCY SKELETON: {dep_file}\n{skeleton}\n" except Exception as e: print(f"Error gathering context for {dep_file}: {e}") if len(injected_context) > 15000: injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]" # MMA Protocol: Tier 3 and 4 are stateless. Build system directive. if role in ['tier3', 'tier3-worker']: system_directive = ( "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). " "Your goal is to implement specific code changes or tests based on the provided task. " "You have access to tools for reading and writing files (Read, Write, Edit), " "codebase investigation (Glob, Grep), " "version control (Bash git commands), and web tools (WebFetch, WebSearch). " "You CAN execute PowerShell scripts via Bash for verification and testing. " "Follow TDD and return success status or code changes. No pleasantries, no conversational filler." ) elif role in ['tier4', 'tier4-qa']: system_directive = ( "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. " "Your goal is to analyze errors, summarize logs, or verify tests. " "You have access to tools for reading files and exploring the codebase (Read, Glob, Grep). " "You CAN execute PowerShell scripts via Bash (read-only) for diagnostics. " "ONLY output the requested analysis. No pleasantries." ) else: system_directive = ( f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " "ONLY output the requested text. No pleasantries." ) command_text = f"{system_directive}\n\n{injected_context}\n\n" # Inline documents to ensure sub-agent has context in headless mode for doc in docs: if os.path.exists(doc): try: with open(doc, 'r', encoding='utf-8') as f: content = f.read() command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n" except Exception as e: print(f"Error inlining {doc}: {e}") command_text += f"\n\nTASK: {prompt}\n\n" # Spawn claude CLI non-interactively via PowerShell ps_command = ( "if (Test-Path 'C:\\projects\\misc\\setup_claude.ps1') " "{ . 'C:\\projects\\misc\\setup_claude.ps1' }; " f"claude --model {model} --print --dangerously-skip-permissions" ) cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command] try: env = os.environ.copy() env['CLAUDE_CLI_HOOK_CONTEXT'] = 'mma_headless' env.pop('ANTHROPIC_API_KEY', None) # Force CLI to use subscription login, not API key process = subprocess.run( cmd, input=command_text, capture_output=True, text=True, encoding='utf-8', env=env ) # claude --print outputs plain text — no JSON parsing needed result = process.stdout if process.stdout else f"Error: {process.stderr}" log_file = log_delegation(role, command_text, result, summary_prompt=prompt) print(f"Sub-agent log created: {log_file}") return result except Exception as e: err_msg = f"Execution failed: {str(e)}" log_delegation(role, command_text, err_msg) return err_msg def create_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Claude MMA Execution Script") parser.add_argument( "--role", choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'], help="The tier role to execute" ) parser.add_argument( "--task-file", type=str, help="TOML file defining the task" ) parser.add_argument( "prompt", type=str, nargs='?', help="The prompt for the tier (optional if --task-file is used)" ) return parser def main() -> None: parser = create_parser() args = parser.parse_args() role = args.role prompt = args.prompt docs = [] if args.task_file and os.path.exists(args.task_file): with open(args.task_file, "rb") as f: task_data = tomllib.load(f) role = task_data.get("role", role) prompt = task_data.get("prompt", prompt) docs = task_data.get("docs", []) if not role or not prompt: parser.print_help() return if not docs: docs = get_role_documents(role) # Extract @file references from the prompt file_refs: list[str] = re.findall(r"@([\w./\\]+)", prompt) for ref in file_refs: if os.path.exists(ref) and ref not in docs: docs.append(ref) print(f"Executing role: {role} with docs: {docs}") result = execute_agent(role, prompt, docs) print(result) if __name__ == "__main__": main()