Files
manual_slop/scripts/mma_exec.py

214 lines
8.2 KiB
Python

import argparse
import subprocess
import json
import os
import tree_sitter
import tree_sitter_python
import ast
import datetime
LOG_FILE = 'logs/mma_delegation.log'
def generate_skeleton(code: str) -> str:
"""
Parses Python code and replaces function/method bodies with '...',
preserving docstrings if present.
"""
try:
PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language())
parser = tree_sitter.Parser(PY_LANGUAGE)
tree = parser.parse(bytes(code, "utf8"))
edits = []
def is_docstring(node):
if node.type == "expression_statement" and node.child_count > 0:
if node.children[0].type == "string":
return True
return False
def walk(node):
if node.type == "function_definition":
body = node.child_by_field_name("body")
if body and body.type == "block":
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
first_stmt = child
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, f"\n{indent}..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
for child in node.children:
walk(child)
walk(tree.root_node)
edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8")
for start, end, replacement in edits:
code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
except Exception as e:
return f"# Error generating skeleton: {e}\n{code}"
def get_model_for_role(role: str) -> str:
"""Returns the specific model to use for a given tier role."""
if role == 'tier1-orchestrator' or role == 'tier1':
return 'gemini-3.1-pro-preview'
elif role == 'tier2-tech-lead' or role == 'tier2':
return 'gemini-3-flash-preview'
else:
return 'gemini-2.5-flash-lite'
def get_role_documents(role: str) -> list[str]:
if role == 'tier1-orchestrator' or role == 'tier1':
return ['conductor/product.md', 'conductor/product-guidelines.md']
elif role == 'tier2-tech-lead' or role == 'tier2':
return ['conductor/tech-stack.md', 'conductor/workflow.md']
elif role == 'tier3-worker' or role == 'tier3':
return ['conductor/workflow.md']
return []
def log_delegation(role, prompt):
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write("--------------------------------------------------\n")
f.write(f"TIMESTAMP: {timestamp}\n")
f.write(f"TIER: {role}\n")
f.write(f"PROMPT: {prompt}\n")
f.write("--------------------------------------------------\n")
def get_dependencies(filepath):
"""Identify top-level module imports from a Python file."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
tree = ast.parse(f.read())
dependencies = []
for node in tree.body:
if isinstance(node, ast.Import):
for alias in node.names:
dependencies.append(alias.name.split('.')[0])
elif isinstance(node, ast.ImportFrom):
if node.module:
dependencies.append(node.module.split('.')[0])
seen = set()
result = []
for d in dependencies:
if d not in seen:
result.append(d)
seen.add(d)
return result
except Exception as e:
print(f"Error getting dependencies for {filepath}: {e}")
return []
def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
log_delegation(role, prompt)
model = get_model_for_role(role)
# Advanced Context: Dependency skeletons for Tier 3
injected_context = ""
# Whitelist of modules that sub-agents have "unfettered" (full) access to.
# These will be provided in full if imported, instead of just skeletons.
UNFETTERED_MODULES = ['mcp_client', 'project_manager']
if role in ['tier3', 'tier3-worker']:
for doc in docs:
if doc.endswith('.py') and os.path.exists(doc):
deps = get_dependencies(doc)
for dep in deps:
# Only try to generate skeletons for files that exist in the local dir
dep_file = f"{dep}.py"
if os.path.exists(dep_file) and dep_file != doc:
try:
if dep in UNFETTERED_MODULES:
with open(dep_file, 'r', encoding='utf-8') as f:
full_content = f.read()
injected_context += f"\n\nFULL MODULE CONTEXT: {dep_file}\n{full_content}\n"
else:
with open(dep_file, 'r', encoding='utf-8') as f:
skeleton = generate_skeleton(f.read())
injected_context += f"\n\nDEPENDENCY SKELETON: {dep_file}\n{skeleton}\n"
except Exception as e:
print(f"Error gathering context for {dep_file}: {e}")
# Check for token-bloat safety: if injected_context is too large, truncate it
if len(injected_context) > 15000:
injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]"
# MMA Protocol: Tier 3 and 4 are stateless and tool-less.
system_directive = f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " \
"DO NOT USE ANY TOOLS (no write_file, no run_shell_command, etc.). " \
"ONLY output the requested text, code, or diff. No pleasantries."
command_text = f"{system_directive}\n\nUse the mma-{role} skill. {injected_context}{prompt}"
for doc in docs:
command_text += f" @{doc}"
# Use --approval-mode plan to ensure the agent is in read-only mode.
cmd = ['gemini', '-p', command_text, '--output-format', 'json', '--model', model, '--approval-mode', 'plan']
try:
process = subprocess.run(cmd, capture_output=True, text=True, shell=True)
if not process.stdout and process.stderr:
return f"Error: {process.stderr}"
stdout = process.stdout
start_index = stdout.find('{')
if start_index != -1:
json_str = stdout[start_index:]
try:
data = json.loads(json_str)
return data.get('response', stdout)
except json.JSONDecodeError:
return stdout
return stdout
except Exception as e:
return f"Execution failed: {str(e)}"
def create_parser():
parser = argparse.ArgumentParser(description="MMA Execution Script")
parser.add_argument(
"--role",
choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'],
required=True,
help="The tier role to execute"
)
parser.add_argument(
"prompt",
type=str,
help="The prompt for the tier"
)
return parser
def main():
parser = create_parser()
args = parser.parse_args()
docs = get_role_documents(args.role)
# Extract @file references from the prompt
import re
file_refs = re.findall(r"@([\w./\\]+)", args.prompt)
for ref in file_refs:
if os.path.exists(ref) and ref not in docs:
docs.append(ref)
print(f"Executing role: {args.role} with docs: {docs}")
result = execute_agent(args.role, args.prompt, docs)
print(result)
if __name__ == "__main__":
main()