refactor(indentation): Apply codebase-wide 1-space ultra-compact refactor. Formatted 21 core modules and tests.
This commit is contained in:
@@ -29,6 +29,7 @@ def has_value_return(node: ast.AST) -> bool:
|
||||
def collect_auto_none(tree: ast.Module) -> list[tuple[str, ast.AST]]:
|
||||
"""Collect functions that can safely get -> None annotation."""
|
||||
results = []
|
||||
|
||||
def scan(scope, prefix=""):
|
||||
for node in ast.iter_child_nodes(scope):
|
||||
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
@@ -61,9 +62,9 @@ def apply_return_none_single_pass(filepath: str) -> int:
|
||||
for name, node in candidates:
|
||||
if not node.body:
|
||||
continue
|
||||
# The colon is on the last line of the signature
|
||||
# For single-line defs: `def foo(self):` -> colon at end
|
||||
# For multi-line defs: last line ends with `):` or similar
|
||||
# The colon is on the last line of the signature
|
||||
# For single-line defs: `def foo(self):` -> colon at end
|
||||
# For multi-line defs: last line ends with `):` or similar
|
||||
body_start = node.body[0].lineno # 1-indexed
|
||||
sig_last_line_idx = body_start - 2 # 0-indexed, the line before body
|
||||
# But for single-line signatures, sig_last_line_idx == node.lineno - 1
|
||||
@@ -96,11 +97,11 @@ def apply_return_none_single_pass(filepath: str) -> int:
|
||||
if colon_idx < 0:
|
||||
stats["errors"].append(f"no colon found: {filepath}:{name} L{sig_last_line_idx+1}")
|
||||
continue
|
||||
# Check not already annotated
|
||||
# Check not already annotated
|
||||
if '->' in code_part:
|
||||
continue
|
||||
edits.append((sig_last_line_idx, colon_idx))
|
||||
# Apply edits in reverse order to preserve line indices
|
||||
# Apply edits in reverse order to preserve line indices
|
||||
edits.sort(key=lambda x: x[0], reverse=True)
|
||||
count = 0
|
||||
for line_idx, colon_col in edits:
|
||||
@@ -111,11 +112,10 @@ def apply_return_none_single_pass(filepath: str) -> int:
|
||||
with open(fp, 'w', encoding='utf-8', newline='') as f:
|
||||
f.writelines(lines)
|
||||
return count
|
||||
|
||||
# --- Manual signature replacements ---
|
||||
# These use regex on the def line to do a targeted replacement.
|
||||
# Each entry: (dotted_name, old_params_pattern, new_full_sig_line)
|
||||
# We match by finding the exact def line and replacing it.
|
||||
# --- Manual signature replacements ---
|
||||
# These use regex on the def line to do a targeted replacement.
|
||||
# Each entry: (dotted_name, old_params_pattern, new_full_sig_line)
|
||||
# We match by finding the exact def line and replacing it.
|
||||
|
||||
def apply_manual_sigs(filepath: str, sig_replacements: list[tuple[str, str]]) -> int:
|
||||
"""Apply manual signature replacements.
|
||||
@@ -164,10 +164,9 @@ def verify_syntax(filepath: str) -> str:
|
||||
return f"Syntax OK: {filepath}"
|
||||
except SyntaxError as e:
|
||||
return f"SyntaxError in {filepath} at line {e.lineno}: {e.msg}"
|
||||
|
||||
# ============================================================
|
||||
# gui_2.py manual signatures (Tier 3 items)
|
||||
# ============================================================
|
||||
# ============================================================
|
||||
# gui_2.py manual signatures (Tier 3 items)
|
||||
# ============================================================
|
||||
GUI2_MANUAL_SIGS: list[tuple[str, str]] = [
|
||||
(r'def resolve_pending_action\(self, action_id: str, approved: bool\):',
|
||||
r'def resolve_pending_action(self, action_id: str, approved: bool) -> bool:'),
|
||||
@@ -281,7 +280,6 @@ if __name__ == "__main__":
|
||||
n = apply_return_none_single_pass("gui_legacy.py")
|
||||
stats["auto_none"] += n
|
||||
print(f" gui_legacy.py: {n} applied")
|
||||
|
||||
# Verify syntax after Phase A
|
||||
for f in ["gui_2.py", "gui_legacy.py"]:
|
||||
r = verify_syntax(f)
|
||||
@@ -289,7 +287,6 @@ if __name__ == "__main__":
|
||||
print(f" ABORT: {r}")
|
||||
sys.exit(1)
|
||||
print(" Syntax OK after Phase A")
|
||||
|
||||
print("\n=== Phase B: Manual signatures (regex) ===")
|
||||
n = apply_manual_sigs("gui_2.py", GUI2_MANUAL_SIGS)
|
||||
stats["manual_sig"] += n
|
||||
@@ -297,7 +294,6 @@ if __name__ == "__main__":
|
||||
n = apply_manual_sigs("gui_legacy.py", LEGACY_MANUAL_SIGS)
|
||||
stats["manual_sig"] += n
|
||||
print(f" gui_legacy.py: {n} applied")
|
||||
|
||||
# Verify syntax after Phase B
|
||||
for f in ["gui_2.py", "gui_legacy.py"]:
|
||||
r = verify_syntax(f)
|
||||
@@ -305,9 +301,9 @@ if __name__ == "__main__":
|
||||
print(f" ABORT: {r}")
|
||||
sys.exit(1)
|
||||
print(" Syntax OK after Phase B")
|
||||
|
||||
print("\n=== Phase C: Variable annotations (regex) ===")
|
||||
# Use re.MULTILINE so ^ matches line starts
|
||||
|
||||
def apply_var_replacements_m(filepath, replacements):
|
||||
fp = abs_path(filepath)
|
||||
with open(fp, 'r', encoding='utf-8') as f:
|
||||
@@ -323,14 +319,12 @@ if __name__ == "__main__":
|
||||
with open(fp, 'w', encoding='utf-8', newline='') as f:
|
||||
f.write(code)
|
||||
return count
|
||||
|
||||
n = apply_var_replacements_m("gui_2.py", GUI2_VAR_REPLACEMENTS)
|
||||
stats["vars"] += n
|
||||
print(f" gui_2.py: {n} applied")
|
||||
n = apply_var_replacements_m("gui_legacy.py", LEGACY_VAR_REPLACEMENTS)
|
||||
stats["vars"] += n
|
||||
print(f" gui_legacy.py: {n} applied")
|
||||
|
||||
print("\n=== Final Syntax Verification ===")
|
||||
all_ok = True
|
||||
for f in ["gui_2.py", "gui_legacy.py"]:
|
||||
@@ -338,7 +332,6 @@ if __name__ == "__main__":
|
||||
print(f" {f}: {r}")
|
||||
if "Error" in r:
|
||||
all_ok = False
|
||||
|
||||
print(f"\n=== Summary ===")
|
||||
print(f" Auto -> None: {stats['auto_none']}")
|
||||
print(f" Manual sigs: {stats['manual_sig']}")
|
||||
|
||||
@@ -11,279 +11,256 @@ import tree_sitter_python
|
||||
LOG_FILE: str = 'logs/claude_mma_delegation.log'
|
||||
|
||||
MODEL_MAP: dict[str, str] = {
|
||||
'tier1-orchestrator': 'claude-opus-4-6',
|
||||
'tier1': 'claude-opus-4-6',
|
||||
'tier2-tech-lead': 'claude-sonnet-4-6',
|
||||
'tier2': 'claude-sonnet-4-6',
|
||||
'tier3-worker': 'claude-sonnet-4-6',
|
||||
'tier3': 'claude-sonnet-4-6',
|
||||
'tier4-qa': 'claude-haiku-4-5',
|
||||
'tier4': 'claude-haiku-4-5',
|
||||
'tier1-orchestrator': 'claude-opus-4-6',
|
||||
'tier1': 'claude-opus-4-6',
|
||||
'tier2-tech-lead': 'claude-sonnet-4-6',
|
||||
'tier2': 'claude-sonnet-4-6',
|
||||
'tier3-worker': 'claude-sonnet-4-6',
|
||||
'tier3': 'claude-sonnet-4-6',
|
||||
'tier4-qa': 'claude-haiku-4-5',
|
||||
'tier4': 'claude-haiku-4-5',
|
||||
}
|
||||
|
||||
|
||||
def generate_skeleton(code: str) -> str:
|
||||
"""
|
||||
"""
|
||||
Parses Python code and replaces function/method bodies with '...',
|
||||
preserving docstrings if present.
|
||||
"""
|
||||
try:
|
||||
PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language())
|
||||
parser = tree_sitter.Parser(PY_LANGUAGE)
|
||||
tree = parser.parse(bytes(code, "utf8"))
|
||||
edits = []
|
||||
try:
|
||||
PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language())
|
||||
parser = tree_sitter.Parser(PY_LANGUAGE)
|
||||
tree = parser.parse(bytes(code, "utf8"))
|
||||
edits = []
|
||||
|
||||
def is_docstring(node):
|
||||
if node.type == "expression_statement" and node.child_count > 0:
|
||||
if node.children[0].type == "string":
|
||||
return True
|
||||
return False
|
||||
|
||||
def walk(node):
|
||||
if node.type == "function_definition":
|
||||
body = node.child_by_field_name("body")
|
||||
if body and body.type == "block":
|
||||
indent = " " * body.start_point.column
|
||||
first_stmt = None
|
||||
for child in body.children:
|
||||
if child.type != "comment":
|
||||
first_stmt = child
|
||||
break
|
||||
if first_stmt and is_docstring(first_stmt):
|
||||
start_byte = first_stmt.end_byte
|
||||
end_byte = body.end_byte
|
||||
if end_byte > start_byte:
|
||||
edits.append((start_byte, end_byte, f"\n{indent}..."))
|
||||
else:
|
||||
start_byte = body.start_byte
|
||||
end_byte = body.end_byte
|
||||
edits.append((start_byte, end_byte, "..."))
|
||||
for child in node.children:
|
||||
walk(child)
|
||||
|
||||
walk(tree.root_node)
|
||||
edits.sort(key=lambda x: x[0], reverse=True)
|
||||
code_bytes = bytearray(code, "utf8")
|
||||
for start, end, replacement in edits:
|
||||
code_bytes[start:end] = bytes(replacement, "utf8")
|
||||
return code_bytes.decode("utf8")
|
||||
except Exception as e:
|
||||
return f"# Error generating skeleton: {e}\n{code}"
|
||||
def is_docstring(node):
|
||||
if node.type == "expression_statement" and node.child_count > 0:
|
||||
if node.children[0].type == "string":
|
||||
return True
|
||||
return False
|
||||
|
||||
def walk(node):
|
||||
if node.type == "function_definition":
|
||||
body = node.child_by_field_name("body")
|
||||
if body and body.type == "block":
|
||||
indent = " " * body.start_point.column
|
||||
first_stmt = None
|
||||
for child in body.children:
|
||||
if child.type != "comment":
|
||||
first_stmt = child
|
||||
break
|
||||
if first_stmt and is_docstring(first_stmt):
|
||||
start_byte = first_stmt.end_byte
|
||||
end_byte = body.end_byte
|
||||
if end_byte > start_byte:
|
||||
edits.append((start_byte, end_byte, f"\n{indent}..."))
|
||||
else:
|
||||
start_byte = body.start_byte
|
||||
end_byte = body.end_byte
|
||||
edits.append((start_byte, end_byte, "..."))
|
||||
for child in node.children:
|
||||
walk(child)
|
||||
walk(tree.root_node)
|
||||
edits.sort(key=lambda x: x[0], reverse=True)
|
||||
code_bytes = bytearray(code, "utf8")
|
||||
for start, end, replacement in edits:
|
||||
code_bytes[start:end] = bytes(replacement, "utf8")
|
||||
return code_bytes.decode("utf8")
|
||||
except Exception as e:
|
||||
return f"# Error generating skeleton: {e}\n{code}"
|
||||
|
||||
def get_model_for_role(role: str) -> str:
|
||||
"""Returns the Claude model to use for a given tier role."""
|
||||
return MODEL_MAP.get(role, 'claude-haiku-4-5')
|
||||
|
||||
"""Returns the Claude model to use for a given tier role."""
|
||||
return MODEL_MAP.get(role, 'claude-haiku-4-5')
|
||||
|
||||
def get_role_documents(role: str) -> list[str]:
|
||||
if role in ('tier1-orchestrator', 'tier1'):
|
||||
return ['conductor/product.md', 'conductor/product-guidelines.md']
|
||||
elif role in ('tier2-tech-lead', 'tier2'):
|
||||
return ['conductor/tech-stack.md', 'conductor/workflow.md']
|
||||
elif role in ('tier3-worker', 'tier3'):
|
||||
return ['conductor/workflow.md']
|
||||
return []
|
||||
|
||||
if role in ('tier1-orchestrator', 'tier1'):
|
||||
return ['conductor/product.md', 'conductor/product-guidelines.md']
|
||||
elif role in ('tier2-tech-lead', 'tier2'):
|
||||
return ['conductor/tech-stack.md', 'conductor/workflow.md']
|
||||
elif role in ('tier3-worker', 'tier3'):
|
||||
return ['conductor/workflow.md']
|
||||
return []
|
||||
|
||||
def log_delegation(role: str, full_prompt: str, result: str | None = None, summary_prompt: str | None = None) -> str:
|
||||
os.makedirs('logs/claude_agents', exist_ok=True)
|
||||
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
log_file = f'logs/claude_agents/claude_{role}_task_{timestamp}.log'
|
||||
with open(log_file, 'w', encoding='utf-8') as f:
|
||||
f.write("==================================================\n")
|
||||
f.write(f"ROLE: {role}\n")
|
||||
f.write(f"TIMESTAMP: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
f.write("--------------------------------------------------\n")
|
||||
f.write(f"FULL PROMPT:\n{full_prompt}\n")
|
||||
f.write("--------------------------------------------------\n")
|
||||
if result:
|
||||
f.write(f"RESULT:\n{result}\n")
|
||||
f.write("==================================================\n")
|
||||
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
|
||||
display_prompt = summary_prompt if summary_prompt else full_prompt
|
||||
with open(LOG_FILE, 'a', encoding='utf-8') as f:
|
||||
f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n")
|
||||
return log_file
|
||||
|
||||
os.makedirs('logs/claude_agents', exist_ok=True)
|
||||
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
log_file = f'logs/claude_agents/claude_{role}_task_{timestamp}.log'
|
||||
with open(log_file, 'w', encoding='utf-8') as f:
|
||||
f.write("==================================================\n")
|
||||
f.write(f"ROLE: {role}\n")
|
||||
f.write(f"TIMESTAMP: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
f.write("--------------------------------------------------\n")
|
||||
f.write(f"FULL PROMPT:\n{full_prompt}\n")
|
||||
f.write("--------------------------------------------------\n")
|
||||
if result:
|
||||
f.write(f"RESULT:\n{result}\n")
|
||||
f.write("==================================================\n")
|
||||
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
|
||||
display_prompt = summary_prompt if summary_prompt else full_prompt
|
||||
with open(LOG_FILE, 'a', encoding='utf-8') as f:
|
||||
f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n")
|
||||
return log_file
|
||||
|
||||
def get_dependencies(filepath: str) -> list[str]:
|
||||
"""Identify top-level module imports from a Python file."""
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8') as f:
|
||||
tree = ast.parse(f.read())
|
||||
dependencies = []
|
||||
for node in tree.body:
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
dependencies.append(alias.name.split('.')[0])
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
if node.module:
|
||||
dependencies.append(node.module.split('.')[0])
|
||||
seen = set()
|
||||
result = []
|
||||
for d in dependencies:
|
||||
if d not in seen:
|
||||
result.append(d)
|
||||
seen.add(d)
|
||||
return result
|
||||
except Exception as e:
|
||||
print(f"Error getting dependencies for {filepath}: {e}")
|
||||
return []
|
||||
|
||||
"""Identify top-level module imports from a Python file."""
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8') as f:
|
||||
tree = ast.parse(f.read())
|
||||
dependencies = []
|
||||
for node in tree.body:
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
dependencies.append(alias.name.split('.')[0])
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
if node.module:
|
||||
dependencies.append(node.module.split('.')[0])
|
||||
seen = set()
|
||||
result = []
|
||||
for d in dependencies:
|
||||
if d not in seen:
|
||||
result.append(d)
|
||||
seen.add(d)
|
||||
return result
|
||||
except Exception as e:
|
||||
print(f"Error getting dependencies for {filepath}: {e}")
|
||||
return []
|
||||
|
||||
def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
|
||||
model = get_model_for_role(role)
|
||||
|
||||
# Advanced Context: Dependency skeletons for Tier 3
|
||||
injected_context = ""
|
||||
UNFETTERED_MODULES: list[str] = ['mcp_client', 'project_manager', 'events', 'aggregate']
|
||||
|
||||
if role in ['tier3', 'tier3-worker']:
|
||||
for doc in docs:
|
||||
if doc.endswith('.py') and os.path.exists(doc):
|
||||
deps = get_dependencies(doc)
|
||||
for dep in deps:
|
||||
dep_file = f"{dep}.py"
|
||||
if dep_file in docs:
|
||||
continue
|
||||
if os.path.exists(dep_file) and dep_file != doc:
|
||||
try:
|
||||
if dep in UNFETTERED_MODULES:
|
||||
with open(dep_file, 'r', encoding='utf-8') as f:
|
||||
full_content = f.read()
|
||||
injected_context += f"\n\nFULL MODULE CONTEXT: {dep_file}\n{full_content}\n"
|
||||
else:
|
||||
with open(dep_file, 'r', encoding='utf-8') as f:
|
||||
skeleton = generate_skeleton(f.read())
|
||||
injected_context += f"\n\nDEPENDENCY SKELETON: {dep_file}\n{skeleton}\n"
|
||||
except Exception as e:
|
||||
print(f"Error gathering context for {dep_file}: {e}")
|
||||
if len(injected_context) > 15000:
|
||||
injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]"
|
||||
|
||||
# MMA Protocol: Tier 3 and 4 are stateless. Build system directive.
|
||||
if role in ['tier3', 'tier3-worker']:
|
||||
system_directive = (
|
||||
"STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). "
|
||||
"Your goal is to implement specific code changes or tests based on the provided task. "
|
||||
"You have access to tools for reading and writing files (Read, Write, Edit), "
|
||||
"codebase investigation (Glob, Grep), "
|
||||
"version control (Bash git commands), and web tools (WebFetch, WebSearch). "
|
||||
"You CAN execute PowerShell scripts via Bash for verification and testing. "
|
||||
"Follow TDD and return success status or code changes. No pleasantries, no conversational filler."
|
||||
)
|
||||
elif role in ['tier4', 'tier4-qa']:
|
||||
system_directive = (
|
||||
"STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. "
|
||||
"Your goal is to analyze errors, summarize logs, or verify tests. "
|
||||
"You have access to tools for reading files and exploring the codebase (Read, Glob, Grep). "
|
||||
"You CAN execute PowerShell scripts via Bash (read-only) for diagnostics. "
|
||||
"ONLY output the requested analysis. No pleasantries."
|
||||
)
|
||||
else:
|
||||
system_directive = (
|
||||
f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. "
|
||||
"ONLY output the requested text. No pleasantries."
|
||||
)
|
||||
|
||||
command_text = f"{system_directive}\n\n{injected_context}\n\n"
|
||||
|
||||
# Inline documents to ensure sub-agent has context in headless mode
|
||||
for doc in docs:
|
||||
if os.path.exists(doc):
|
||||
try:
|
||||
with open(doc, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n"
|
||||
except Exception as e:
|
||||
print(f"Error inlining {doc}: {e}")
|
||||
|
||||
command_text += f"\n\nTASK: {prompt}\n\n"
|
||||
|
||||
# Spawn claude CLI non-interactively via PowerShell
|
||||
ps_command = (
|
||||
"if (Test-Path 'C:\\projects\\misc\\setup_claude.ps1') "
|
||||
"{ . 'C:\\projects\\misc\\setup_claude.ps1' }; "
|
||||
f"claude --model {model} --print"
|
||||
)
|
||||
cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command]
|
||||
|
||||
try:
|
||||
env = os.environ.copy()
|
||||
env['CLAUDE_CLI_HOOK_CONTEXT'] = 'mma_headless'
|
||||
process = subprocess.run(
|
||||
cmd,
|
||||
input=command_text,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding='utf-8',
|
||||
env=env
|
||||
)
|
||||
# claude --print outputs plain text — no JSON parsing needed
|
||||
result = process.stdout if process.stdout else f"Error: {process.stderr}"
|
||||
log_file = log_delegation(role, command_text, result, summary_prompt=prompt)
|
||||
print(f"Sub-agent log created: {log_file}")
|
||||
return result
|
||||
except Exception as e:
|
||||
err_msg = f"Execution failed: {str(e)}"
|
||||
log_delegation(role, command_text, err_msg)
|
||||
return err_msg
|
||||
|
||||
model = get_model_for_role(role)
|
||||
# Advanced Context: Dependency skeletons for Tier 3
|
||||
injected_context = ""
|
||||
UNFETTERED_MODULES: list[str] = ['mcp_client', 'project_manager', 'events', 'aggregate']
|
||||
if role in ['tier3', 'tier3-worker']:
|
||||
for doc in docs:
|
||||
if doc.endswith('.py') and os.path.exists(doc):
|
||||
deps = get_dependencies(doc)
|
||||
for dep in deps:
|
||||
dep_file = f"{dep}.py"
|
||||
if dep_file in docs:
|
||||
continue
|
||||
if os.path.exists(dep_file) and dep_file != doc:
|
||||
try:
|
||||
if dep in UNFETTERED_MODULES:
|
||||
with open(dep_file, 'r', encoding='utf-8') as f:
|
||||
full_content = f.read()
|
||||
injected_context += f"\n\nFULL MODULE CONTEXT: {dep_file}\n{full_content}\n"
|
||||
else:
|
||||
with open(dep_file, 'r', encoding='utf-8') as f:
|
||||
skeleton = generate_skeleton(f.read())
|
||||
injected_context += f"\n\nDEPENDENCY SKELETON: {dep_file}\n{skeleton}\n"
|
||||
except Exception as e:
|
||||
print(f"Error gathering context for {dep_file}: {e}")
|
||||
if len(injected_context) > 15000:
|
||||
injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]"
|
||||
# MMA Protocol: Tier 3 and 4 are stateless. Build system directive.
|
||||
if role in ['tier3', 'tier3-worker']:
|
||||
system_directive = (
|
||||
"STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). "
|
||||
"Your goal is to implement specific code changes or tests based on the provided task. "
|
||||
"You have access to tools for reading and writing files (Read, Write, Edit), "
|
||||
"codebase investigation (Glob, Grep), "
|
||||
"version control (Bash git commands), and web tools (WebFetch, WebSearch). "
|
||||
"You CAN execute PowerShell scripts via Bash for verification and testing. "
|
||||
"Follow TDD and return success status or code changes. No pleasantries, no conversational filler."
|
||||
)
|
||||
elif role in ['tier4', 'tier4-qa']:
|
||||
system_directive = (
|
||||
"STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. "
|
||||
"Your goal is to analyze errors, summarize logs, or verify tests. "
|
||||
"You have access to tools for reading files and exploring the codebase (Read, Glob, Grep). "
|
||||
"You CAN execute PowerShell scripts via Bash (read-only) for diagnostics. "
|
||||
"ONLY output the requested analysis. No pleasantries."
|
||||
)
|
||||
else:
|
||||
system_directive = (
|
||||
f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. "
|
||||
"ONLY output the requested text. No pleasantries."
|
||||
)
|
||||
command_text = f"{system_directive}\n\n{injected_context}\n\n"
|
||||
# Inline documents to ensure sub-agent has context in headless mode
|
||||
for doc in docs:
|
||||
if os.path.exists(doc):
|
||||
try:
|
||||
with open(doc, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n"
|
||||
except Exception as e:
|
||||
print(f"Error inlining {doc}: {e}")
|
||||
command_text += f"\n\nTASK: {prompt}\n\n"
|
||||
# Spawn claude CLI non-interactively via PowerShell
|
||||
ps_command = (
|
||||
"if (Test-Path 'C:\\projects\\misc\\setup_claude.ps1') "
|
||||
"{ . 'C:\\projects\\misc\\setup_claude.ps1' }; "
|
||||
f"claude --model {model} --print"
|
||||
)
|
||||
cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command]
|
||||
try:
|
||||
env = os.environ.copy()
|
||||
env['CLAUDE_CLI_HOOK_CONTEXT'] = 'mma_headless'
|
||||
process = subprocess.run(
|
||||
cmd,
|
||||
input=command_text,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding='utf-8',
|
||||
env=env
|
||||
)
|
||||
# claude --print outputs plain text — no JSON parsing needed
|
||||
result = process.stdout if process.stdout else f"Error: {process.stderr}"
|
||||
log_file = log_delegation(role, command_text, result, summary_prompt=prompt)
|
||||
print(f"Sub-agent log created: {log_file}")
|
||||
return result
|
||||
except Exception as e:
|
||||
err_msg = f"Execution failed: {str(e)}"
|
||||
log_delegation(role, command_text, err_msg)
|
||||
return err_msg
|
||||
|
||||
def create_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Claude MMA Execution Script")
|
||||
parser.add_argument(
|
||||
"--role",
|
||||
choices=['tier1', 'tier2', 'tier3', 'tier4',
|
||||
'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'],
|
||||
help="The tier role to execute"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--task-file",
|
||||
type=str,
|
||||
help="TOML file defining the task"
|
||||
)
|
||||
parser.add_argument(
|
||||
"prompt",
|
||||
type=str,
|
||||
nargs='?',
|
||||
help="The prompt for the tier (optional if --task-file is used)"
|
||||
)
|
||||
return parser
|
||||
|
||||
parser = argparse.ArgumentParser(description="Claude MMA Execution Script")
|
||||
parser.add_argument(
|
||||
"--role",
|
||||
choices=['tier1', 'tier2', 'tier3', 'tier4',
|
||||
'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'],
|
||||
help="The tier role to execute"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--task-file",
|
||||
type=str,
|
||||
help="TOML file defining the task"
|
||||
)
|
||||
parser.add_argument(
|
||||
"prompt",
|
||||
type=str,
|
||||
nargs='?',
|
||||
help="The prompt for the tier (optional if --task-file is used)"
|
||||
)
|
||||
return parser
|
||||
|
||||
def main() -> None:
|
||||
parser = create_parser()
|
||||
args = parser.parse_args()
|
||||
role = args.role
|
||||
prompt = args.prompt
|
||||
docs = []
|
||||
|
||||
if args.task_file and os.path.exists(args.task_file):
|
||||
with open(args.task_file, "rb") as f:
|
||||
task_data = tomllib.load(f)
|
||||
role = task_data.get("role", role)
|
||||
prompt = task_data.get("prompt", prompt)
|
||||
docs = task_data.get("docs", [])
|
||||
|
||||
if not role or not prompt:
|
||||
parser.print_help()
|
||||
return
|
||||
|
||||
if not docs:
|
||||
docs = get_role_documents(role)
|
||||
|
||||
# Extract @file references from the prompt
|
||||
file_refs: list[str] = re.findall(r"@([\w./\\]+)", prompt)
|
||||
for ref in file_refs:
|
||||
if os.path.exists(ref) and ref not in docs:
|
||||
docs.append(ref)
|
||||
|
||||
print(f"Executing role: {role} with docs: {docs}")
|
||||
result = execute_agent(role, prompt, docs)
|
||||
print(result)
|
||||
|
||||
parser = create_parser()
|
||||
args = parser.parse_args()
|
||||
role = args.role
|
||||
prompt = args.prompt
|
||||
docs = []
|
||||
if args.task_file and os.path.exists(args.task_file):
|
||||
with open(args.task_file, "rb") as f:
|
||||
task_data = tomllib.load(f)
|
||||
role = task_data.get("role", role)
|
||||
prompt = task_data.get("prompt", prompt)
|
||||
docs = task_data.get("docs", [])
|
||||
if not role or not prompt:
|
||||
parser.print_help()
|
||||
return
|
||||
if not docs:
|
||||
docs = get_role_documents(role)
|
||||
# Extract @file references from the prompt
|
||||
file_refs: list[str] = re.findall(r"@([\w./\\]+)", prompt)
|
||||
for ref in file_refs:
|
||||
if os.path.exists(ref) and ref not in docs:
|
||||
docs.append(ref)
|
||||
print(f"Executing role: {role} with docs: {docs}")
|
||||
result = execute_agent(role, prompt, docs)
|
||||
print(result)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
|
||||
@@ -6,83 +6,72 @@ import os
|
||||
# Add project root to sys.path so we can import api_hook_client
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
if project_root not in sys.path:
|
||||
sys.path.append(project_root)
|
||||
sys.path.append(project_root)
|
||||
|
||||
try:
|
||||
from api_hook_client import ApiHookClient
|
||||
from api_hook_client import ApiHookClient
|
||||
except ImportError:
|
||||
print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
def main() -> None:
|
||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr)
|
||||
logging.debug("Claude Tool Bridge script started.")
|
||||
try:
|
||||
input_data = sys.stdin.read()
|
||||
if not input_data:
|
||||
logging.debug("No input received from stdin. Exiting gracefully.")
|
||||
return
|
||||
logging.debug(f"Received raw input data: {input_data}")
|
||||
try:
|
||||
hook_input = json.loads(input_data)
|
||||
except json.JSONDecodeError:
|
||||
logging.error("Failed to decode JSON from stdin.")
|
||||
print(json.dumps({"decision": "deny", "reason": "Invalid JSON received from stdin."}))
|
||||
return
|
||||
|
||||
# Claude Code PreToolUse hook format: tool_name + tool_input
|
||||
tool_name = hook_input.get('tool_name')
|
||||
tool_input = hook_input.get('tool_input', {})
|
||||
|
||||
if tool_name is None:
|
||||
logging.error("Could not determine tool name from input. Expected 'tool_name'.")
|
||||
print(json.dumps({"decision": "deny", "reason": "Missing 'tool_name' in hook input."}))
|
||||
return
|
||||
|
||||
if not isinstance(tool_input, dict):
|
||||
logging.warning(f"tool_input is not a dict: {tool_input}. Treating as empty.")
|
||||
tool_input = {}
|
||||
|
||||
logging.debug(f"Resolved tool_name: '{tool_name}', tool_input: {tool_input}")
|
||||
|
||||
# Check context — if not running via Manual Slop, pass through
|
||||
hook_context = os.environ.get("CLAUDE_CLI_HOOK_CONTEXT")
|
||||
logging.debug(f"Checking CLAUDE_CLI_HOOK_CONTEXT: '{hook_context}'")
|
||||
|
||||
if hook_context == 'mma_headless':
|
||||
# Sub-agents in headless MMA mode: auto-allow all tools
|
||||
logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'mma_headless'. Allowing for sub-agent.")
|
||||
print(json.dumps({"decision": "allow", "reason": "Sub-agent headless mode (MMA)."}))
|
||||
return
|
||||
|
||||
if hook_context != 'manual_slop':
|
||||
# Not a programmatic Manual Slop session — allow through silently
|
||||
logging.debug(f"CLAUDE_CLI_HOOK_CONTEXT is '{hook_context}', not 'manual_slop'. Allowing.")
|
||||
print(json.dumps({"decision": "allow", "reason": f"Non-programmatic usage (CLAUDE_CLI_HOOK_CONTEXT={hook_context})."}))
|
||||
return
|
||||
|
||||
# manual_slop context: route to GUI for approval
|
||||
logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'manual_slop'. Routing to API Hook Client.")
|
||||
client = ApiHookClient(base_url="http://127.0.0.1:8999")
|
||||
try:
|
||||
logging.debug(f"Requesting confirmation for tool '{tool_name}' with args: {tool_input}")
|
||||
response = client.request_confirmation(tool_name, tool_input)
|
||||
if response and response.get('approved') is True:
|
||||
logging.debug("User approved tool execution.")
|
||||
print(json.dumps({"decision": "allow"}))
|
||||
else:
|
||||
reason = response.get('reason', 'User rejected tool execution in GUI.') if response else 'No response from GUI.'
|
||||
logging.debug(f"User denied tool execution. Reason: {reason}")
|
||||
print(json.dumps({"decision": "deny", "reason": reason}))
|
||||
except Exception as e:
|
||||
logging.error(f"API Hook Client error: {str(e)}", exc_info=True)
|
||||
print(json.dumps({"decision": "deny", "reason": f"Manual Slop hook server unreachable: {str(e)}"}))
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error in bridge: {str(e)}", exc_info=True)
|
||||
print(json.dumps({"decision": "deny", "reason": f"Internal bridge error: {str(e)}"}))
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr)
|
||||
logging.debug("Claude Tool Bridge script started.")
|
||||
try:
|
||||
input_data = sys.stdin.read()
|
||||
if not input_data:
|
||||
logging.debug("No input received from stdin. Exiting gracefully.")
|
||||
return
|
||||
logging.debug(f"Received raw input data: {input_data}")
|
||||
try:
|
||||
hook_input = json.loads(input_data)
|
||||
except json.JSONDecodeError:
|
||||
logging.error("Failed to decode JSON from stdin.")
|
||||
print(json.dumps({"decision": "deny", "reason": "Invalid JSON received from stdin."}))
|
||||
return
|
||||
# Claude Code PreToolUse hook format: tool_name + tool_input
|
||||
tool_name = hook_input.get('tool_name')
|
||||
tool_input = hook_input.get('tool_input', {})
|
||||
if tool_name is None:
|
||||
logging.error("Could not determine tool name from input. Expected 'tool_name'.")
|
||||
print(json.dumps({"decision": "deny", "reason": "Missing 'tool_name' in hook input."}))
|
||||
return
|
||||
if not isinstance(tool_input, dict):
|
||||
logging.warning(f"tool_input is not a dict: {tool_input}. Treating as empty.")
|
||||
tool_input = {}
|
||||
logging.debug(f"Resolved tool_name: '{tool_name}', tool_input: {tool_input}")
|
||||
# Check context — if not running via Manual Slop, pass through
|
||||
hook_context = os.environ.get("CLAUDE_CLI_HOOK_CONTEXT")
|
||||
logging.debug(f"Checking CLAUDE_CLI_HOOK_CONTEXT: '{hook_context}'")
|
||||
if hook_context == 'mma_headless':
|
||||
# Sub-agents in headless MMA mode: auto-allow all tools
|
||||
logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'mma_headless'. Allowing for sub-agent.")
|
||||
print(json.dumps({"decision": "allow", "reason": "Sub-agent headless mode (MMA)."}))
|
||||
return
|
||||
if hook_context != 'manual_slop':
|
||||
# Not a programmatic Manual Slop session — allow through silently
|
||||
logging.debug(f"CLAUDE_CLI_HOOK_CONTEXT is '{hook_context}', not 'manual_slop'. Allowing.")
|
||||
print(json.dumps({"decision": "allow", "reason": f"Non-programmatic usage (CLAUDE_CLI_HOOK_CONTEXT={hook_context})."}))
|
||||
return
|
||||
# manual_slop context: route to GUI for approval
|
||||
logging.debug("CLAUDE_CLI_HOOK_CONTEXT is 'manual_slop'. Routing to API Hook Client.")
|
||||
client = ApiHookClient(base_url="http://127.0.0.1:8999")
|
||||
try:
|
||||
logging.debug(f"Requesting confirmation for tool '{tool_name}' with args: {tool_input}")
|
||||
response = client.request_confirmation(tool_name, tool_input)
|
||||
if response and response.get('approved') is True:
|
||||
logging.debug("User approved tool execution.")
|
||||
print(json.dumps({"decision": "allow"}))
|
||||
else:
|
||||
reason = response.get('reason', 'User rejected tool execution in GUI.') if response else 'No response from GUI.'
|
||||
logging.debug(f"User denied tool execution. Reason: {reason}")
|
||||
print(json.dumps({"decision": "deny", "reason": reason}))
|
||||
except Exception as e:
|
||||
logging.error(f"API Hook Client error: {str(e)}", exc_info=True)
|
||||
print(json.dumps({"decision": "deny", "reason": f"Manual Slop hook server unreachable: {str(e)}"}))
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error in bridge: {str(e)}", exc_info=True)
|
||||
print(json.dumps({"decision": "deny", "reason": f"Internal bridge error: {str(e)}"}))
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
|
||||
@@ -2,13 +2,11 @@ import os
|
||||
import re
|
||||
|
||||
with open('mcp_client.py', 'r', encoding='utf-8') as f:
|
||||
content: str = f.read()
|
||||
|
||||
# 1. Add import os if not there
|
||||
content: str = f.read()
|
||||
# 1. Add import os if not there
|
||||
if 'import os' not in content:
|
||||
content: str = content.replace('import summarize', 'import os\nimport summarize')
|
||||
|
||||
# 2. Add the functions before "# ------------------------------------------------------------------ web tools"
|
||||
content: str = content.replace('import summarize', 'import os\nimport summarize')
|
||||
# 2. Add the functions before "# ------------------------------------------------------------------ web tools"
|
||||
functions_code: str = r'''
|
||||
def py_find_usages(path: str, name: str) -> str:
|
||||
"""Finds exact string matches of a symbol in a given file or directory."""
|
||||
@@ -184,11 +182,10 @@ content: str = content.replace('# ----------------------------------------------
|
||||
# 3. Update TOOL_NAMES
|
||||
old_tool_names_match: re.Match | None = re.search(r'TOOL_NAMES\s*=\s*\{([^}]*)\}', content)
|
||||
if old_tool_names_match:
|
||||
old_names: str = old_tool_names_match.group(1)
|
||||
new_names: str = old_names + ', "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"'
|
||||
content: str = content.replace(old_tool_names_match.group(0), f'TOOL_NAMES = {{{new_names}}}')
|
||||
|
||||
# 4. Update dispatch
|
||||
old_names: str = old_tool_names_match.group(1)
|
||||
new_names: str = old_names + ', "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"'
|
||||
content: str = content.replace(old_tool_names_match.group(0), f'TOOL_NAMES = {{{new_names}}}')
|
||||
# 4. Update dispatch
|
||||
dispatch_additions: str = r'''
|
||||
if tool_name == "py_find_usages":
|
||||
return py_find_usages(tool_input.get("path", ""), tool_input.get("name", ""))
|
||||
@@ -205,7 +202,7 @@ dispatch_additions: str = r'''
|
||||
return f"ERROR: unknown MCP tool '{tool_name}'"
|
||||
'''
|
||||
content: str = re.sub(
|
||||
r' return f"ERROR: unknown MCP tool \'{tool_name}\'"', dispatch_additions.strip(), content)
|
||||
r' return f"ERROR: unknown MCP tool \'{tool_name}\'"', dispatch_additions.strip(), content)
|
||||
|
||||
# 5. Update MCP_TOOL_SPECS
|
||||
mcp_tool_specs_addition: str = r'''
|
||||
@@ -283,9 +280,9 @@ mcp_tool_specs_addition: str = r'''
|
||||
'''
|
||||
|
||||
content: str = re.sub(
|
||||
r'\]\s*$', mcp_tool_specs_addition.strip(), content)
|
||||
r'\]\s*$', mcp_tool_specs_addition.strip(), content)
|
||||
|
||||
with open('mcp_client.py', 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
f.write(content)
|
||||
|
||||
print("Injected new tools.")
|
||||
|
||||
@@ -26,69 +26,65 @@ from mcp.types import Tool, TextContent
|
||||
# run_powershell is handled by shell_runner, not mcp_client.dispatch()
|
||||
# Define its spec here since it's not in MCP_TOOL_SPECS
|
||||
RUN_POWERSHELL_SPEC = {
|
||||
"name": "run_powershell",
|
||||
"description": (
|
||||
"Run a PowerShell script within the project base directory. "
|
||||
"Returns combined stdout, stderr, and exit code. "
|
||||
"60-second timeout. Use for builds, tests, and system commands."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"script": {
|
||||
"type": "string",
|
||||
"description": "PowerShell script content to execute."
|
||||
}
|
||||
},
|
||||
"required": ["script"]
|
||||
}
|
||||
"name": "run_powershell",
|
||||
"description": (
|
||||
"Run a PowerShell script within the project base directory. "
|
||||
"Returns combined stdout, stderr, and exit code. "
|
||||
"60-second timeout. Use for builds, tests, and system commands."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"script": {
|
||||
"type": "string",
|
||||
"description": "PowerShell script content to execute."
|
||||
}
|
||||
},
|
||||
"required": ["script"]
|
||||
}
|
||||
}
|
||||
|
||||
server = Server("manual-slop-tools")
|
||||
|
||||
|
||||
@server.list_tools()
|
||||
async def list_tools() -> list[Tool]:
|
||||
tools = []
|
||||
for spec in mcp_client.MCP_TOOL_SPECS:
|
||||
tools.append(Tool(
|
||||
name=spec["name"],
|
||||
description=spec["description"],
|
||||
inputSchema=spec["parameters"],
|
||||
))
|
||||
# Add run_powershell
|
||||
tools.append(Tool(
|
||||
name=RUN_POWERSHELL_SPEC["name"],
|
||||
description=RUN_POWERSHELL_SPEC["description"],
|
||||
inputSchema=RUN_POWERSHELL_SPEC["parameters"],
|
||||
))
|
||||
return tools
|
||||
|
||||
tools = []
|
||||
for spec in mcp_client.MCP_TOOL_SPECS:
|
||||
tools.append(Tool(
|
||||
name=spec["name"],
|
||||
description=spec["description"],
|
||||
inputSchema=spec["parameters"],
|
||||
))
|
||||
# Add run_powershell
|
||||
tools.append(Tool(
|
||||
name=RUN_POWERSHELL_SPEC["name"],
|
||||
description=RUN_POWERSHELL_SPEC["description"],
|
||||
inputSchema=RUN_POWERSHELL_SPEC["parameters"],
|
||||
))
|
||||
return tools
|
||||
|
||||
@server.call_tool()
|
||||
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
||||
try:
|
||||
if name == "run_powershell":
|
||||
script = arguments.get("script", "")
|
||||
result = shell_runner.run_powershell(script, os.getcwd())
|
||||
else:
|
||||
result = mcp_client.dispatch(name, arguments)
|
||||
return [TextContent(type="text", text=str(result))]
|
||||
except Exception as e:
|
||||
return [TextContent(type="text", text=f"ERROR: {e}")]
|
||||
|
||||
try:
|
||||
if name == "run_powershell":
|
||||
script = arguments.get("script", "")
|
||||
result = shell_runner.run_powershell(script, os.getcwd())
|
||||
else:
|
||||
result = mcp_client.dispatch(name, arguments)
|
||||
return [TextContent(type="text", text=str(result))]
|
||||
except Exception as e:
|
||||
return [TextContent(type="text", text=f"ERROR: {e}")]
|
||||
|
||||
async def main() -> None:
|
||||
# Configure mcp_client with the project root so py_* tools are not ACCESS DENIED
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
mcp_client.configure([], extra_base_dirs=[project_root])
|
||||
async with stdio_server() as (read_stream, write_stream):
|
||||
await server.run(
|
||||
read_stream,
|
||||
write_stream,
|
||||
server.create_initialization_options(),
|
||||
)
|
||||
|
||||
# Configure mcp_client with the project root so py_* tools are not ACCESS DENIED
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
mcp_client.configure([], extra_base_dirs=[project_root])
|
||||
async with stdio_server() as (read_stream, write_stream):
|
||||
await server.run(
|
||||
read_stream,
|
||||
write_stream,
|
||||
server.create_initialization_options(),
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
asyncio.run(main())
|
||||
|
||||
@@ -18,8 +18,9 @@ for root, dirs, files in os.walk('.'):
|
||||
except Exception:
|
||||
continue
|
||||
counts: list[int] = [0, 0, 0] # nr, up, uv
|
||||
|
||||
def scan(scope: ast.AST, prefix: str = '') -> None:
|
||||
# Iterate top-level nodes in this scope
|
||||
# Iterate top-level nodes in this scope
|
||||
for node in ast.iter_child_nodes(scope):
|
||||
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
if node.returns is None:
|
||||
|
||||
Reference in New Issue
Block a user