129 lines
4.4 KiB
Python
129 lines
4.4 KiB
Python
import argparse
|
|
import subprocess
|
|
import json
|
|
import os
|
|
import tree_sitter
|
|
import tree_sitter_python
|
|
|
|
def generate_skeleton(code: str) -> str:
|
|
"""
|
|
Parses Python code and replaces function/method bodies with '...',
|
|
preserving docstrings if present.
|
|
"""
|
|
try:
|
|
PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language())
|
|
parser = tree_sitter.Parser(PY_LANGUAGE)
|
|
tree = parser.parse(bytes(code, "utf8"))
|
|
|
|
edits = []
|
|
|
|
def is_docstring(node):
|
|
if node.type == "expression_statement" and node.child_count > 0:
|
|
if node.children[0].type == "string":
|
|
return True
|
|
return False
|
|
|
|
def walk(node):
|
|
if node.type == "function_definition":
|
|
body = node.child_by_field_name("body")
|
|
if body and body.type == "block":
|
|
indent = " " * body.start_point.column
|
|
first_stmt = None
|
|
for child in body.children:
|
|
if child.type != "comment":
|
|
first_stmt = child
|
|
break
|
|
|
|
if first_stmt and is_docstring(first_stmt):
|
|
start_byte = first_stmt.end_byte
|
|
end_byte = body.end_byte
|
|
if end_byte > start_byte:
|
|
edits.append((start_byte, end_byte, f"\n{indent}..."))
|
|
else:
|
|
start_byte = body.start_byte
|
|
end_byte = body.end_byte
|
|
edits.append((start_byte, end_byte, "..."))
|
|
|
|
for child in node.children:
|
|
walk(child)
|
|
|
|
walk(tree.root_node)
|
|
|
|
edits.sort(key=lambda x: x[0], reverse=True)
|
|
code_bytes = bytearray(code, "utf8")
|
|
for start, end, replacement in edits:
|
|
code_bytes[start:end] = bytes(replacement, "utf8")
|
|
|
|
return code_bytes.decode("utf8")
|
|
except Exception as e:
|
|
return f"# Error generating skeleton: {e}\n{code}"
|
|
|
|
def get_model_for_role(role: str) -> str:
|
|
"""Returns the specific model to use for a given tier role."""
|
|
if role == 'tier1-orchestrator' or role == 'tier1':
|
|
return 'gemini-3.1-pro-preview'
|
|
elif role == 'tier2-tech-lead' or role == 'tier2':
|
|
return 'gemini-3-flash-preview'
|
|
else:
|
|
return 'gemini-2.5-flash-lite'
|
|
|
|
def get_role_documents(role: str) -> list[str]:
|
|
if role == 'tier1-orchestrator' or role == 'tier1':
|
|
return ['conductor/product.md', 'conductor/product-guidelines.md']
|
|
elif role == 'tier2-tech-lead' or role == 'tier2':
|
|
return ['conductor/tech-stack.md', 'conductor/workflow.md']
|
|
elif role == 'tier3-worker' or role == 'tier3':
|
|
return ['conductor/workflow.md']
|
|
return []
|
|
|
|
def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
|
|
model = get_model_for_role(role)
|
|
command_text = f"Use the mma-{role} skill. {prompt}"
|
|
for doc in docs:
|
|
command_text += f" @{doc}"
|
|
|
|
cmd = ['gemini', '-p', command_text, '--output-format', 'json', '--model', model]
|
|
try:
|
|
process = subprocess.run(cmd, capture_output=True, text=True, shell=True)
|
|
if not process.stdout and process.stderr:
|
|
return f"Error: {process.stderr}"
|
|
|
|
stdout = process.stdout
|
|
start_index = stdout.find('{')
|
|
if start_index != -1:
|
|
json_str = stdout[start_index:]
|
|
try:
|
|
data = json.loads(json_str)
|
|
return data.get('response', stdout)
|
|
except json.JSONDecodeError:
|
|
return stdout
|
|
return stdout
|
|
except Exception as e:
|
|
return f"Execution failed: {str(e)}"
|
|
|
|
def create_parser():
|
|
parser = argparse.ArgumentParser(description="MMA Execution Script")
|
|
parser.add_argument(
|
|
"--role",
|
|
choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'],
|
|
required=True,
|
|
help="The tier role to execute"
|
|
)
|
|
parser.add_argument(
|
|
"prompt",
|
|
type=str,
|
|
help="The prompt for the tier"
|
|
)
|
|
return parser
|
|
|
|
def main():
|
|
parser = create_parser()
|
|
args = parser.parse_args()
|
|
|
|
docs = get_role_documents(args.role)
|
|
print(f"Executing role: {args.role} with docs: {docs}")
|
|
result = execute_agent(args.role, args.prompt, docs)
|
|
print(result)
|
|
|
|
if __name__ == "__main__":
|
|
main() |