This commit is contained in:
2026-02-25 23:11:42 -05:00
parent 7bbc484053
commit 3a2856b27d

View File

@@ -2,6 +2,7 @@ import argparse
import subprocess import subprocess
import json import json
import os import os
import tomllib
import tree_sitter import tree_sitter
import tree_sitter_python import tree_sitter_python
import ast import ast
@@ -122,7 +123,7 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
injected_context = "" injected_context = ""
# Whitelist of modules that sub-agents have "unfettered" (full) access to. # Whitelist of modules that sub-agents have "unfettered" (full) access to.
# These will be provided in full if imported, instead of just skeletons. # These will be provided in full if imported, instead of just skeletons.
UNFETTERED_MODULES = ['mcp_client', 'project_manager'] UNFETTERED_MODULES = ['mcp_client', 'project_manager', 'events', 'aggregate']
if role in ['tier3', 'tier3-worker']: if role in ['tier3', 'tier3-worker']:
for doc in docs: for doc in docs:
@@ -131,6 +132,12 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
for dep in deps: for dep in deps:
# Only try to generate skeletons for files that exist in the local dir # Only try to generate skeletons for files that exist in the local dir
dep_file = f"{dep}.py" dep_file = f"{dep}.py"
# Optimization: If the dependency is already in 'docs' (explicitly provided),
# do NOT inject its skeleton/full context again as a dependency.
if dep_file in docs:
continue
if os.path.exists(dep_file) and dep_file != doc: if os.path.exists(dep_file) and dep_file != doc:
try: try:
if dep in UNFETTERED_MODULES: if dep in UNFETTERED_MODULES:
@@ -148,19 +155,45 @@ def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
if len(injected_context) > 15000: if len(injected_context) > 15000:
injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]" injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]"
# MMA Protocol: Tier 3 and 4 are stateless and tool-less. # MMA Protocol: Tier 3 and 4 are stateless.
system_directive = f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " \ if role in ['tier3', 'tier3-worker']:
"DO NOT USE ANY TOOLS (no write_file, no run_shell_command, etc.). " \ system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). " \
"ONLY output the requested text, code, or diff. No pleasantries." "Your goal is to generate high-quality code or diffs based on the provided task. " \
"DO NOT USE ANY TOOLS (no write_file, no run_shell_command, etc.). " \
"ONLY output the clean code or the requested diff. No pleasantries, no conversational filler."
elif role in ['tier4', 'tier4-qa']:
system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. " \
"Your goal is to analyze errors, summarize logs, or verify tests. " \
"DO NOT USE ANY TOOLS. ONLY output the requested analysis. No pleasantries."
else:
system_directive = f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " \
"DO NOT USE ANY TOOLS. ONLY output the requested text. No pleasantries."
command_text = f"{system_directive}\n\nUse the mma-{role} skill. {injected_context}{prompt}" command_text = f"{system_directive}\n\n{injected_context}\n\n"
for doc in docs:
command_text += f" @{doc}"
# Use --approval-mode plan to ensure the agent is in read-only mode. # Manually inline documents to ensure sub-agent has context in headless mode
cmd = ['gemini', '-p', command_text, '--output-format', 'json', '--model', model, '--approval-mode', 'plan'] for doc in docs:
if os.path.exists(doc):
try:
with open(doc, 'r', encoding='utf-8') as f:
content = f.read()
command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n"
except Exception as e:
print(f"Error inlining {doc}: {e}")
command_text += f"\n\nTASK: {prompt}\n\n"
# Use subprocess with input to pipe the prompt via stdin, avoiding WinError 206.
# We use -p 'task' to ensure non-interactive (headless) mode and valid parsing.
ps_command = (
f"if (Test-Path 'C:\\projects\\misc\\setup_gemini.ps1') {{ . 'C:\\projects\\misc\\setup_gemini.ps1' }}; "
f"gemini -p 'task' --output-format json --model {model}"
)
cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command]
try: try:
process = subprocess.run(cmd, capture_output=True, text=True, shell=True) process = subprocess.run(cmd, input=command_text, capture_output=True, text=True, encoding='utf-8')
if not process.stdout and process.stderr: if not process.stdout and process.stderr:
return f"Error: {process.stderr}" return f"Error: {process.stderr}"
@@ -182,13 +215,18 @@ def create_parser():
parser.add_argument( parser.add_argument(
"--role", "--role",
choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'], choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'],
required=True,
help="The tier role to execute" help="The tier role to execute"
) )
parser.add_argument(
"--task-file",
type=str,
help="TOML file defining the task"
)
parser.add_argument( parser.add_argument(
"prompt", "prompt",
type=str, type=str,
help="The prompt for the tier" nargs='?',
help="The prompt for the tier (optional if --task-file is used)"
) )
return parser return parser
@@ -196,17 +234,33 @@ def main():
parser = create_parser() parser = create_parser()
args = parser.parse_args() args = parser.parse_args()
docs = get_role_documents(args.role) role = args.role
prompt = args.prompt
docs = []
if args.task_file and os.path.exists(args.task_file):
with open(args.task_file, "rb") as f:
task_data = tomllib.load(f)
role = task_data.get("role", role)
prompt = task_data.get("prompt", prompt)
docs = task_data.get("docs", [])
if not role or not prompt:
parser.print_help()
return
if not docs:
docs = get_role_documents(role)
# Extract @file references from the prompt # Extract @file references from the prompt
import re import re
file_refs = re.findall(r"@([\w./\\]+)", args.prompt) file_refs = re.findall(r"@([\w./\\]+)", prompt)
for ref in file_refs: for ref in file_refs:
if os.path.exists(ref) and ref not in docs: if os.path.exists(ref) and ref not in docs:
docs.append(ref) docs.append(ref)
print(f"Executing role: {args.role} with docs: {docs}") print(f"Executing role: {role} with docs: {docs}")
result = execute_agent(args.role, args.prompt, docs) result = execute_agent(role, prompt, docs)
print(result) print(result)
if __name__ == "__main__": if __name__ == "__main__":