checkpoint: Claude Code integration + implement missing MCP var tools

Add Claude Code conductor commands, MCP server, MMA exec scripts,
and implement py_get_var_declaration / py_set_var_declaration which
were registered in dispatch and tool specs but had no function bodies.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 10:47:42 -05:00
parent d36632c21a
commit a2a1447f58
22 changed files with 1845 additions and 0 deletions

289
scripts/claude_mma_exec.py Normal file
View File

@@ -0,0 +1,289 @@
import argparse
import subprocess
import os
import ast
import datetime
import re
import tomllib
import tree_sitter
import tree_sitter_python
LOG_FILE = 'logs/claude_mma_delegation.log'
MODEL_MAP = {
'tier1-orchestrator': 'claude-opus-4-6',
'tier1': 'claude-opus-4-6',
'tier2-tech-lead': 'claude-sonnet-4-6',
'tier2': 'claude-sonnet-4-6',
'tier3-worker': 'claude-sonnet-4-6',
'tier3': 'claude-sonnet-4-6',
'tier4-qa': 'claude-haiku-4-5',
'tier4': 'claude-haiku-4-5',
}
def generate_skeleton(code: str) -> str:
"""
Parses Python code and replaces function/method bodies with '...',
preserving docstrings if present.
"""
try:
PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language())
parser = tree_sitter.Parser(PY_LANGUAGE)
tree = parser.parse(bytes(code, "utf8"))
edits = []
def is_docstring(node):
if node.type == "expression_statement" and node.child_count > 0:
if node.children[0].type == "string":
return True
return False
def walk(node):
if node.type == "function_definition":
body = node.child_by_field_name("body")
if body and body.type == "block":
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
first_stmt = child
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, f"\n{indent}..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
for child in node.children:
walk(child)
walk(tree.root_node)
edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8")
for start, end, replacement in edits:
code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
except Exception as e:
return f"# Error generating skeleton: {e}\n{code}"
def get_model_for_role(role: str) -> str:
"""Returns the Claude model to use for a given tier role."""
return MODEL_MAP.get(role, 'claude-haiku-4-5')
def get_role_documents(role: str) -> list[str]:
if role in ('tier1-orchestrator', 'tier1'):
return ['conductor/product.md', 'conductor/product-guidelines.md']
elif role in ('tier2-tech-lead', 'tier2'):
return ['conductor/tech-stack.md', 'conductor/workflow.md']
elif role in ('tier3-worker', 'tier3'):
return ['conductor/workflow.md']
return []
def log_delegation(role, full_prompt, result=None, summary_prompt=None):
os.makedirs('logs/claude_agents', exist_ok=True)
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = f'logs/claude_agents/claude_{role}_task_{timestamp}.log'
with open(log_file, 'w', encoding='utf-8') as f:
f.write("==================================================\n")
f.write(f"ROLE: {role}\n")
f.write(f"TIMESTAMP: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("--------------------------------------------------\n")
f.write(f"FULL PROMPT:\n{full_prompt}\n")
f.write("--------------------------------------------------\n")
if result:
f.write(f"RESULT:\n{result}\n")
f.write("==================================================\n")
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
display_prompt = summary_prompt if summary_prompt else full_prompt
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n")
return log_file
def get_dependencies(filepath: str) -> list[str]:
"""Identify top-level module imports from a Python file."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
tree = ast.parse(f.read())
dependencies = []
for node in tree.body:
if isinstance(node, ast.Import):
for alias in node.names:
dependencies.append(alias.name.split('.')[0])
elif isinstance(node, ast.ImportFrom):
if node.module:
dependencies.append(node.module.split('.')[0])
seen = set()
result = []
for d in dependencies:
if d not in seen:
result.append(d)
seen.add(d)
return result
except Exception as e:
print(f"Error getting dependencies for {filepath}: {e}")
return []
def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
model = get_model_for_role(role)
# Advanced Context: Dependency skeletons for Tier 3
injected_context = ""
UNFETTERED_MODULES = ['mcp_client', 'project_manager', 'events', 'aggregate']
if role in ['tier3', 'tier3-worker']:
for doc in docs:
if doc.endswith('.py') and os.path.exists(doc):
deps = get_dependencies(doc)
for dep in deps:
dep_file = f"{dep}.py"
if dep_file in docs:
continue
if os.path.exists(dep_file) and dep_file != doc:
try:
if dep in UNFETTERED_MODULES:
with open(dep_file, 'r', encoding='utf-8') as f:
full_content = f.read()
injected_context += f"\n\nFULL MODULE CONTEXT: {dep_file}\n{full_content}\n"
else:
with open(dep_file, 'r', encoding='utf-8') as f:
skeleton = generate_skeleton(f.read())
injected_context += f"\n\nDEPENDENCY SKELETON: {dep_file}\n{skeleton}\n"
except Exception as e:
print(f"Error gathering context for {dep_file}: {e}")
if len(injected_context) > 15000:
injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]"
# MMA Protocol: Tier 3 and 4 are stateless. Build system directive.
if role in ['tier3', 'tier3-worker']:
system_directive = (
"STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). "
"Your goal is to implement specific code changes or tests based on the provided task. "
"You have access to tools for reading and writing files (Read, Write, Edit), "
"codebase investigation (Glob, Grep), "
"version control (Bash git commands), and web tools (WebFetch, WebSearch). "
"You CAN execute PowerShell scripts via Bash for verification and testing. "
"Follow TDD and return success status or code changes. No pleasantries, no conversational filler."
)
elif role in ['tier4', 'tier4-qa']:
system_directive = (
"STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. "
"Your goal is to analyze errors, summarize logs, or verify tests. "
"You have access to tools for reading files and exploring the codebase (Read, Glob, Grep). "
"You CAN execute PowerShell scripts via Bash (read-only) for diagnostics. "
"ONLY output the requested analysis. No pleasantries."
)
else:
system_directive = (
f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. "
"ONLY output the requested text. No pleasantries."
)
command_text = f"{system_directive}\n\n{injected_context}\n\n"
# Inline documents to ensure sub-agent has context in headless mode
for doc in docs:
if os.path.exists(doc):
try:
with open(doc, 'r', encoding='utf-8') as f:
content = f.read()
command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n"
except Exception as e:
print(f"Error inlining {doc}: {e}")
command_text += f"\n\nTASK: {prompt}\n\n"
# Spawn claude CLI non-interactively via PowerShell
ps_command = (
"if (Test-Path 'C:\\projects\\misc\\setup_claude.ps1') "
"{ . 'C:\\projects\\misc\\setup_claude.ps1' }; "
f"claude --model {model} --print"
)
cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command]
try:
env = os.environ.copy()
env['CLAUDE_CLI_HOOK_CONTEXT'] = 'mma_headless'
process = subprocess.run(
cmd,
input=command_text,
capture_output=True,
text=True,
encoding='utf-8',
env=env
)
# claude --print outputs plain text — no JSON parsing needed
result = process.stdout if process.stdout else f"Error: {process.stderr}"
log_file = log_delegation(role, command_text, result, summary_prompt=prompt)
print(f"Sub-agent log created: {log_file}")
return result
except Exception as e:
err_msg = f"Execution failed: {str(e)}"
log_delegation(role, command_text, err_msg)
return err_msg
def create_parser():
parser = argparse.ArgumentParser(description="Claude MMA Execution Script")
parser.add_argument(
"--role",
choices=['tier1', 'tier2', 'tier3', 'tier4',
'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'],
help="The tier role to execute"
)
parser.add_argument(
"--task-file",
type=str,
help="TOML file defining the task"
)
parser.add_argument(
"prompt",
type=str,
nargs='?',
help="The prompt for the tier (optional if --task-file is used)"
)
return parser
def main():
parser = create_parser()
args = parser.parse_args()
role = args.role
prompt = args.prompt
docs = []
if args.task_file and os.path.exists(args.task_file):
with open(args.task_file, "rb") as f:
task_data = tomllib.load(f)
role = task_data.get("role", role)
prompt = task_data.get("prompt", prompt)
docs = task_data.get("docs", [])
if not role or not prompt:
parser.print_help()
return
if not docs:
docs = get_role_documents(role)
# Extract @file references from the prompt
file_refs = re.findall(r"@([\w./\\]+)", prompt)
for ref in file_refs:
if os.path.exists(ref) and ref not in docs:
docs.append(ref)
print(f"Executing role: {role} with docs: {docs}")
result = execute_agent(role, prompt, docs)
print(result)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,88 @@
import sys
import json
import logging
import os
# Add project root to sys.path so we can import api_hook_client
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if project_root not in sys.path:
sys.path.append(project_root)
try:
from api_hook_client import ApiHookClient
except ImportError:
print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr)
sys.exit(1)
def main():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr)
logging.debug("Claude Tool Bridge script started.")
try:
input_data = sys.stdin.read()
if not input_data:
logging.debug("No input received from stdin. Exiting gracefully.")
return
logging.debug(f"Received raw input data: {input_data}")
try:
hook_input = json.loads(input_data)
except json.JSONDecodeError:
logging.error("Failed to decode JSON from stdin.")
print(json.dumps({"decision": "deny", "reason": "Invalid JSON received from stdin."}))
return
# Claude Code PreToolUse hook format: tool_name + tool_input
tool_name = hook_input.get('tool_name')
tool_input = hook_input.get('tool_input', {})
if tool_name is None:
logging.error("Could not determine tool name from input. Expected 'tool_name'.")
print(json.dumps({"decision": "deny", "reason": "Missing 'tool_name' in hook input."}))
return
if not isinstance(tool_input, dict):
logging.warning(f"tool_input is not a dict: {tool_input}. Treating as empty.")
tool_input = {}
logging.debug(f"Resolved tool_name: '{tool_name}', tool_input: {tool_input}")
# Check context — if not running via Manual Slop, pass through
hook_context = os.environ.get("CLAUDE_CLI_HOOK_CONTEXT")
logging.debug(f"Checking CLAUDE_CLI_HOOK_CONTEXT: '{hook_context}'")
if hook_context == 'mma_headless':
# Sub-agents in headless MMA mode: auto-allow all tools
logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'mma_headless'. Allowing for sub-agent.")
print(json.dumps({"decision": "allow", "reason": "Sub-agent headless mode (MMA)."}))
return
if hook_context != 'manual_slop':
# Not a programmatic Manual Slop session — allow through silently
logging.debug(f"CLAUDE_CLI_HOOK_CONTEXT is '{hook_context}', not 'manual_slop'. Allowing.")
print(json.dumps({"decision": "allow", "reason": f"Non-programmatic usage (CLAUDE_CLI_HOOK_CONTEXT={hook_context})."}))
return
# manual_slop context: route to GUI for approval
logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'manual_slop'. Routing to API Hook Client.")
client = ApiHookClient(base_url="http://127.0.0.1:8999")
try:
logging.debug(f"Requesting confirmation for tool '{tool_name}' with args: {tool_input}")
response = client.request_confirmation(tool_name, tool_input)
if response and response.get('approved') is True:
logging.debug("User approved tool execution.")
print(json.dumps({"decision": "allow"}))
else:
reason = response.get('reason', 'User rejected tool execution in GUI.') if response else 'No response from GUI.'
logging.debug(f"User denied tool execution. Reason: {reason}")
print(json.dumps({"decision": "deny", "reason": reason}))
except Exception as e:
logging.error(f"API Hook Client error: {str(e)}", exc_info=True)
print(json.dumps({"decision": "deny", "reason": f"Manual Slop hook server unreachable: {str(e)}"}))
except Exception as e:
logging.error(f"Unexpected error in bridge: {str(e)}", exc_info=True)
print(json.dumps({"decision": "deny", "reason": f"Internal bridge error: {str(e)}"}))
if __name__ == "__main__":
main()

91
scripts/mcp_server.py Normal file
View File

@@ -0,0 +1,91 @@
"""
MCP server exposing Manual Slop's custom tools (mcp_client.py) to Claude Code.
All 26 tools from mcp_client.MCP_TOOL_SPECS are served, plus run_powershell.
Delegates to mcp_client.dispatch() for all tools except run_powershell,
which routes through shell_runner.run_powershell() directly.
Usage (in .claude/settings.json mcpServers):
"command": "uv", "args": ["run", "python", "scripts/mcp_server.py"]
"""
import asyncio
import os
import sys
# Add project root to sys.path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import mcp_client
import shell_runner
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# run_powershell is handled by shell_runner, not mcp_client.dispatch()
# Define its spec here since it's not in MCP_TOOL_SPECS
RUN_POWERSHELL_SPEC = {
"name": "run_powershell",
"description": (
"Run a PowerShell script within the project base directory. "
"Returns combined stdout, stderr, and exit code. "
"60-second timeout. Use for builds, tests, and system commands."
),
"parameters": {
"type": "object",
"properties": {
"script": {
"type": "string",
"description": "PowerShell script content to execute."
}
},
"required": ["script"]
}
}
server = Server("manual-slop-tools")
@server.list_tools()
async def list_tools() -> list[Tool]:
tools = []
for spec in mcp_client.MCP_TOOL_SPECS:
tools.append(Tool(
name=spec["name"],
description=spec["description"],
inputSchema=spec["parameters"],
))
# Add run_powershell
tools.append(Tool(
name=RUN_POWERSHELL_SPEC["name"],
description=RUN_POWERSHELL_SPEC["description"],
inputSchema=RUN_POWERSHELL_SPEC["parameters"],
))
return tools
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
if name == "run_powershell":
script = arguments.get("script", "")
result = shell_runner.run_powershell(script, os.getcwd())
else:
result = mcp_client.dispatch(name, arguments)
return [TextContent(type="text", text=str(result))]
except Exception as e:
return [TextContent(type="text", text=f"ERROR: {e}")]
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
if __name__ == "__main__":
asyncio.run(main())