updates to tools and mma skills

This commit is contained in:
2026-02-28 07:51:02 -05:00
parent db069abe83
commit db118f0a5c
21 changed files with 1070 additions and 650 deletions

View File

@@ -15,6 +15,12 @@ tools:
- discovered_tool_fetch_url - discovered_tool_fetch_url
- activate_skill - activate_skill
- discovered_tool_run_powershell - discovered_tool_run_powershell
- discovered_tool_py_find_usages
- discovered_tool_py_get_imports
- discovered_tool_py_check_syntax
- discovered_tool_py_get_hierarchy
- discovered_tool_py_get_docstring
- discovered_tool_get_tree
--- ---
STRICT SYSTEM DIRECTIVE: You are a Tier 1 Orchestrator. STRICT SYSTEM DIRECTIVE: You are a Tier 1 Orchestrator.
Focused on product alignment, high-level planning, and track initialization. Focused on product alignment, high-level planning, and track initialization.

View File

@@ -17,6 +17,12 @@ tools:
- discovered_tool_fetch_url - discovered_tool_fetch_url
- activate_skill - activate_skill
- discovered_tool_run_powershell - discovered_tool_run_powershell
- discovered_tool_py_find_usages
- discovered_tool_py_get_imports
- discovered_tool_py_check_syntax
- discovered_tool_py_get_hierarchy
- discovered_tool_py_get_docstring
- discovered_tool_get_tree
--- ---
STRICT SYSTEM DIRECTIVE: You are a Tier 2 Tech Lead. STRICT SYSTEM DIRECTIVE: You are a Tier 2 Tech Lead.
Focused on architectural design and track execution. Focused on architectural design and track execution.

View File

@@ -17,6 +17,12 @@ tools:
- discovered_tool_fetch_url - discovered_tool_fetch_url
- activate_skill - activate_skill
- discovered_tool_run_powershell - discovered_tool_run_powershell
- discovered_tool_py_find_usages
- discovered_tool_py_get_imports
- discovered_tool_py_check_syntax
- discovered_tool_py_get_hierarchy
- discovered_tool_py_get_docstring
- discovered_tool_get_tree
--- ---
STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor).
Your goal is to implement specific code changes or tests based on the provided task. Your goal is to implement specific code changes or tests based on the provided task.

View File

@@ -15,6 +15,12 @@ tools:
- discovered_tool_fetch_url - discovered_tool_fetch_url
- activate_skill - activate_skill
- discovered_tool_run_powershell - discovered_tool_run_powershell
- discovered_tool_py_find_usages
- discovered_tool_py_get_imports
- discovered_tool_py_check_syntax
- discovered_tool_py_get_hierarchy
- discovered_tool_py_get_docstring
- discovered_tool_get_tree
--- ---
STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent.
Your goal is to analyze errors, summarize logs, or verify tests. Your goal is to analyze errors, summarize logs, or verify tests.

Binary file not shown.

View File

@@ -1,5 +1,5 @@
{ {
"name": "get_code_outline", "name": "py_get_code_outline",
"description": "Get a hierarchical outline of a code file. This returns classes, functions, and methods with their line ranges and brief docstrings. Use this to quickly map out a file's structure before reading specific sections.", "description": "Get a hierarchical outline of a code file. This returns classes, functions, and methods with their line ranges and brief docstrings. Use this to quickly map out a file's structure before reading specific sections.",
"parameters": { "parameters": {
"type": "object", "type": "object",
@@ -13,5 +13,5 @@
"path" "path"
] ]
}, },
"command": "python scripts/tool_call.py get_code_outline" "command": "python scripts/tool_call.py py_get_code_outline"
} }

View File

@@ -1,5 +1,5 @@
{ {
"name": "get_python_skeleton", "name": "py_get_skeleton",
"description": "Get a skeleton view of a Python file. This returns all classes and function signatures with their docstrings, but replaces function bodies with '...'. Use this to understand module interfaces without reading the full implementation.", "description": "Get a skeleton view of a Python file. This returns all classes and function signatures with their docstrings, but replaces function bodies with '...'. Use this to understand module interfaces without reading the full implementation.",
"parameters": { "parameters": {
"type": "object", "type": "object",
@@ -13,5 +13,5 @@
"path" "path"
] ]
}, },
"command": "python scripts/tool_call.py get_python_skeleton" "command": "python scripts/tool_call.py py_get_skeleton"
} }

View File

@@ -7,7 +7,7 @@
- [ ] Task: Conductor - User Manual Verification 'Phase 1' (Protocol in workflow.md) - [ ] Task: Conductor - User Manual Verification 'Phase 1' (Protocol in workflow.md)
## Phase 2: AST Skeleton Extraction (Skeleton Views) ## Phase 2: AST Skeleton Extraction (Skeleton Views)
- [x] Task: Enhance `mcp_client.py` with `get_python_skeleton` functionality using `tree-sitter` to extract signatures and docstrings. e950601 - [x] Task: Enhance `mcp_client.py` with `py_get_skeleton` functionality using `tree-sitter` to extract signatures and docstrings. e950601
- [x] Task: Update `mma_exec.py` to utilize these skeletons for non-target dependencies when preparing context for Tier 3. e950601 - [x] Task: Update `mma_exec.py` to utilize these skeletons for non-target dependencies when preparing context for Tier 3. e950601
- [x] Task: Integrate "Interface-level" scrubbed versions into the sub-agent injection logic. e950601 - [x] Task: Integrate "Interface-level" scrubbed versions into the sub-agent injection logic. e950601
- [ ] Task: Conductor - User Manual Verification 'Phase 2' (Protocol in workflow.md) - [ ] Task: Conductor - User Manual Verification 'Phase 2' (Protocol in workflow.md)

View File

@@ -9,7 +9,7 @@
5. **User Experience First:** Every decision should prioritize user experience 5. **User Experience First:** Every decision should prioritize user experience
6. **Non-Interactive & CI-Aware:** Prefer non-interactive commands. Use `CI=true` for watch-mode tools (tests, linters) to ensure single execution. 6. **Non-Interactive & CI-Aware:** Prefer non-interactive commands. Use `CI=true` for watch-mode tools (tests, linters) to ensure single execution.
7. **MMA Tiered Delegation is Mandatory:** The Conductor acts as a Tier 1/2 Orchestrator. You MUST delegate all non-trivial coding to Tier 3 Workers and all error analysis to Tier 4 QA Agents. Do NOT perform large file writes directly. 7. **MMA Tiered Delegation is Mandatory:** The Conductor acts as a Tier 1/2 Orchestrator. You MUST delegate all non-trivial coding to Tier 3 Workers and all error analysis to Tier 4 QA Agents. Do NOT perform large file writes directly.
8. **Mandatory Research-First Protocol:** Before reading the full content of any file over 50 lines, you MUST use `get_file_summary`, `get_python_skeleton`, or `get_code_outline` to map the architecture and identify specific target ranges. Use `get_git_diff` to understand recent changes. 8. **Mandatory Research-First Protocol:** Before reading the full content of any file over 50 lines, you MUST use `get_file_summary`, `py_get_skeleton`, `py_get_code_outline`, or `py_get_docstring` to map the architecture and identify specific target ranges. Use `get_git_diff` to understand recent changes. Use `py_find_usages` to locate where symbols are used.
## Task Workflow ## Task Workflow
@@ -24,11 +24,10 @@ All tasks follow a strict lifecycle:
2. **Mark In Progress:** Before beginning work, edit `plan.md` and change the task from `[ ]` to `[~]` 2. **Mark In Progress:** Before beginning work, edit `plan.md` and change the task from `[ ]` to `[~]`
3. **High-Signal Research Phase:** 3. **High-Signal Research Phase:**
- **Identify Dependencies:** Use `list_directory` and `grep_search` to find relevant files. - **Identify Dependencies:** Use `list_directory`, `get_tree`, and `py_get_imports` to map file relations.
- **Map Architecture:** Use `get_code_outline` or `get_python_skeleton` on identified files to understand their structure. - **Map Architecture:** Use `py_get_code_outline` or `py_get_skeleton` on identified files to understand their structure.
- **Analyze Changes:** Use `get_git_diff` if the task involves modifying recently updated code. - **Analyze Changes:** Use `get_git_diff` if the task involves modifying recently updated code.
- **Minimize Token Burn:** Only use `read_file` with `start_line`/`end_line` for specific implementation details once target areas are identified. - **Minimize Token Burn:** Only use `read_file` with `start_line`/`end_line` for specific implementation details once target areas are identified.
4. **Write Failing Tests (Red Phase):** 4. **Write Failing Tests (Red Phase):**
- **Delegate Test Creation:** Do NOT write test code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a prompt to create the necessary test files and unit tests based on the task criteria. - **Delegate Test Creation:** Do NOT write test code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a prompt to create the necessary test files and unit tests based on the task criteria.
- Take the code generated by the Worker and apply it. - Take the code generated by the Worker and apply it.

View File

@@ -26,17 +26,19 @@ If you run a test or command that fails with a significant error or large traceb
1. **DO NOT** analyze the raw logs in your own context window. 1. **DO NOT** analyze the raw logs in your own context window.
2. **DO** spawn a stateless Tier 4 agent to diagnose the failure. 2. **DO** spawn a stateless Tier 4 agent to diagnose the failure.
3. *Command:* `uv run python scripts/mma_exec.py --role tier4-qa "Analyze this failure and summarize the root cause: [LOG_DATA]"` 3. *Command:* `uv run python scripts/mma_exec.py --role tier4-qa "Analyze this failure and summarize the root cause: [LOG_DATA]"`
4. **Mandatory Research-First Protocol:** Avoid direct `read_file` calls for any file over 50 lines. Use `get_file_summary`, `get_python_skeleton`, or `get_code_outline` first to identify relevant sections. Use `git diff` to understand changes. 4. **Mandatory Research-First Protocol:** Avoid direct `read_file` calls for any file over 50 lines. Use `get_file_summary`, `py_get_skeleton`, or `py_get_code_outline` first to identify relevant sections. Use `git diff` to understand changes.
## 3. Persistent Tech Lead Memory (Tier 2) ## 3. Persistent Tech Lead Memory (Tier 2)
Unlike the stateless sub-agents (Tiers 3 & 4), the **Tier 2 Tech Lead** maintains persistent context throughout the implementation of a track. Do NOT apply "Context Amnesia" to your own session during track implementation. You are responsible for the continuity of the technical strategy. Unlike the stateless sub-agents (Tiers 3 & 4), the **Tier 2 Tech Lead** maintains persistent context throughout the implementation of a track. Do NOT apply "Context Amnesia" to your own session during track implementation. You are responsible for the continuity of the technical strategy.
## 4. AST Skeleton & Outline Views ## 4. AST Skeleton & Outline Views
To minimize context bloat for Tier 2 & 3: To minimize context bloat for Tier 2 & 3:
1. Use `get_code_outline` to map out the structure of a file. 1. Use `py_get_code_outline` or `get_tree` to map out the structure of a file or project.
2. Use `get_python_skeleton` to understand the interface and docstrings of dependencies. 2. Use `py_get_skeleton` and `py_get_imports` to understand the interface, docstrings, and dependencies of modules.
3. Only use `read_file` with `start_line` and `end_line` for specific implementation details once target areas are identified. 3. Use `py_find_usages` to pinpoint where a function or class is called instead of searching the whole codebase.
4. Tier 3 workers MUST NOT read the full content of unrelated files. 4. Use `py_check_syntax` after making string replacements to ensure the file is still syntactically valid.
5. Only use `read_file` with `start_line` and `end_line` for specific implementation details once target areas are identified.
6. Tier 3 workers MUST NOT read the full content of unrelated files.
<examples> <examples>
### Example 1: Spawning a Tier 4 QA Agent ### Example 1: Spawning a Tier 4 QA Agent

View File

@@ -1,125 +1,130 @@
import tokenize import tokenize
import io import io
import os
import sys
def format_code(source: str) -> str: def format_code(source: str) -> str:
""" """
Formats Python code to use exactly 1 space for indentation (including continuations), Formats Python code to use exactly 1 space for indentation (including continuations),
max 1 blank line between top-level definitions, and 0 blank lines inside max 1 blank line between top-level definitions, and 0 blank lines inside
function/method bodies. function/method bodies.
Args:
source: The Python source code to format.
Returns:
The formatted source code.
""" """
if not source: if not source:
return "" return ""
try:
tokens = list(tokenize.generate_tokens(io.StringIO(source).readline)) tokens = list(tokenize.generate_tokens(io.StringIO(source).readline))
lines = source.splitlines(keepends=True) except tokenize.TokenError:
num_lines = len(lines) return source # Return as-is if it's not valid python (e.g. template files)
lines = source.splitlines(keepends=True)
block_level = 0 num_lines = len(lines)
paren_level = 0 block_level = 0
in_function_stack = [] paren_level = 0
in_function_stack = []
expecting_function_indent = False
line_indent = {}
line_is_blank = {i: True for i in range(1, num_lines + 2)}
line_is_string_interior = {i: False for i in range(1, num_lines + 2)}
line_seen = set()
pending_blank_lines = []
for tok in tokens:
t_type = tok.type
t_string = tok.string
start_line, _ = tok.start
end_line, _ = tok.end
if t_type == tokenize.STRING:
for l in range(start_line + 1, end_line + 1):
line_is_string_interior[l] = True
if t_type not in (tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, tokenize.DEDENT, tokenize.ENDMARKER):
for l in range(start_line, end_line + 1):
line_is_blank[l] = False
pending_blank_lines = []
if t_type == tokenize.INDENT:
block_level += 1
if expecting_function_indent:
in_function_stack.append(block_level)
expecting_function_indent = False expecting_function_indent = False
elif t_type == tokenize.DEDENT:
line_indent = {} block_level -= 1
line_is_blank = {i: True for i in range(1, num_lines + 2)} if in_function_stack and block_level < in_function_stack[-1]:
line_is_string_interior = {i: False for i in range(1, num_lines + 2)} in_function_stack.pop()
for l in pending_blank_lines:
line_seen = set() line_indent[l] = block_level + paren_level
pending_blank_lines = [] if t_string in (')', ']', '}'):
paren_level -= 1
for tok in tokens: if start_line not in line_seen:
t_type = tok.type line_indent[start_line] = block_level + paren_level
t_string = tok.string if t_type not in (tokenize.INDENT, tokenize.DEDENT):
start_line, _ = tok.start line_seen.add(start_line)
end_line, _ = tok.end if t_type in (tokenize.NL, tokenize.NEWLINE):
pending_blank_lines.append(start_line)
if t_type == tokenize.STRING: if t_type == tokenize.NAME and t_string == 'def':
for l in range(start_line + 1, end_line + 1): expecting_function_indent = True
line_is_string_interior[l] = True if t_string in ('(', '[', '{'):
paren_level += 1
if t_type not in (tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, tokenize.DEDENT, tokenize.ENDMARKER): output = []
for l in range(start_line, end_line + 1): consecutive_blanks = 0
line_is_blank[l] = False for i in range(1, num_lines + 1):
pending_blank_lines = [] # Real content seen, clear pending blanks if line_is_string_interior[i]:
output.append(lines[i-1])
continue
if line_is_blank[i]:
indent = line_indent.get(i, 0)
if indent > 0:
continue
else:
if consecutive_blanks < 1:
output.append("\n")
consecutive_blanks += 1
continue
original_line = lines[i-1]
indent = line_indent.get(i, 0)
stripped = original_line.lstrip()
is_def_start = stripped.startswith(('def ', 'class ', 'async def ', '@'))
if is_def_start and output and consecutive_blanks == 0:
prev_line = output[-1].strip()
if prev_line and not prev_line.endswith(':') and not prev_line.startswith('@'):
output.append("\n")
consecutive_blanks += 1
consecutive_blanks = 0
output.append(" " * indent + stripped)
if not stripped.endswith('\n') and i < num_lines:
output[-1] += '\n'
if output and not output[-1].endswith('\n'):
output[-1] += '\n'
return "".join(output)
# State updates that affect CURRENT line def process_file(file_path: str, write: bool) -> None:
if t_type == tokenize.INDENT: try:
block_level += 1 with open(file_path, "r", encoding="utf-8") as f:
if expecting_function_indent: content = f.read()
in_function_stack.append(block_level) formatted = format_code(content)
expecting_function_indent = False if write:
elif t_type == tokenize.DEDENT: if formatted != content:
block_level -= 1 with open(file_path, "w", encoding="utf-8") as f:
if in_function_stack and block_level < in_function_stack[-1]: f.write(formatted)
in_function_stack.pop() print(f"Formatted: {file_path}")
# Retroactively update pending blank lines to the current (outer) level else:
for l in pending_blank_lines: sys.stdout.reconfigure(encoding='utf-8')
line_indent[l] = block_level + paren_level sys.stdout.write(formatted)
except Exception as e:
if t_string in (')', ']', '}'): print(f"Error processing {file_path}: {e}")
paren_level -= 1
if start_line not in line_seen: def main() -> None:
line_indent[start_line] = block_level + paren_level import argparse
if t_type not in (tokenize.INDENT, tokenize.DEDENT): parser = argparse.ArgumentParser(description="AI-optimized Python code formatter.")
line_seen.add(start_line) parser.add_argument("paths", nargs="+", help="Files or directories to format.")
if t_type in (tokenize.NL, tokenize.NEWLINE): parser.add_argument("--write", action="store_true", help="Write changes back to files.")
pending_blank_lines.append(start_line) parser.add_argument("--exclude", nargs="*", default=[".venv", "__pycache__", ".git"], help="Directories to exclude.")
args = parser.parse_args()
# State updates that affect FUTURE lines/tokens for path in args.paths:
if t_type == tokenize.NAME and t_string == 'def': if os.path.isfile(path):
expecting_function_indent = True process_file(path, args.write)
if t_string in ('(', '[', '{'): elif os.path.isdir(path):
paren_level += 1 for root, dirs, files in os.walk(path):
dirs[:] = [d for d in dirs if d not in args.exclude]
output = [] for file in files:
consecutive_blanks = 0 if file.endswith(".py"):
process_file(os.path.join(root, file), args.write)
for i in range(1, num_lines + 1):
if line_is_string_interior[i]:
output.append(lines[i-1])
continue
if line_is_blank[i]:
indent = line_indent.get(i, 0)
if indent > 0:
continue
else:
if consecutive_blanks < 1:
output.append("\n")
consecutive_blanks += 1
continue
consecutive_blanks = 0
original_line = lines[i-1]
indent = line_indent.get(i, 0)
stripped = original_line.lstrip()
output.append(" " * indent + stripped)
if not stripped.endswith('\n') and i < num_lines:
output[-1] += '\n'
if output and not output[-1].endswith('\n'):
output[-1] += '\n'
return "".join(output)
if __name__ == "__main__": if __name__ == "__main__":
import sys main()
import os
if len(sys.argv) > 1:
file_path = sys.argv[1]
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
formatted = format_code(content)
if len(sys.argv) > 2 and sys.argv[2] == "--write":
with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted)
else:
sys.stdout.reconfigure(encoding='utf-8')
sys.stdout.write(formatted)

View File

@@ -3,20 +3,20 @@ import sys
files = ['ai_client.py', 'aggregate.py', 'mcp_client.py', 'shell_runner.py'] files = ['ai_client.py', 'aggregate.py', 'mcp_client.py', 'shell_runner.py']
for file_path in files: for file_path in files:
print(f"Checking {file_path}...") print(f"Checking {file_path}...")
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines() lines = f.readlines()
for i, line in enumerate(lines): for i, line in enumerate(lines):
if line.strip().startswith('def '): if line.strip().startswith('def '):
if '->' not in line: if '->' not in line:
# Check next line if it's a multiline def # Check next line if it's a multiline def
if '):' not in line: if '):' not in line:
full_def = line full_def = line
j = i + 1 j = i + 1
while j < len(lines) and '):' not in lines[j-1]: while j < len(lines) and '):' not in lines[j-1]:
full_def += lines[j] full_def += lines[j]
j += 1 j += 1
if '->' not in full_def: if '->' not in full_def:
print(f" Missing hint at line {i+1}: {line.strip()}") print(f" Missing hint at line {i+1}: {line.strip()}")
else: else:
print(f" Missing hint at line {i+1}: {line.strip()}") print(f" Missing hint at line {i+1}: {line.strip()}")

View File

@@ -3,29 +3,27 @@ import sys
files = ['ai_client.py', 'aggregate.py', 'mcp_client.py', 'shell_runner.py'] files = ['ai_client.py', 'aggregate.py', 'mcp_client.py', 'shell_runner.py']
for file_path in files: for file_path in files:
print(f"Checking {file_path}...") print(f"Checking {file_path}...")
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
content = f.read() content = f.read()
# Find all function definitions # Find all function definitions
# This regex is simplified and might miss some edge cases (like multi-line defs) # This regex is simplified and might miss some edge cases (like multi-line defs)
# But it's better than nothing. # But it's better than nothing.
defs = re.finditer(r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\((.*?)\)\s*(->\s*.*?)?:', content, re.DOTALL) defs = re.finditer(r'def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\((.*?)\)\s*(->\s*.*?)?:', content, re.DOTALL)
for m in defs: for m in defs:
name = m.group(1) name = m.group(1)
args = m.group(2).strip() args = m.group(2).strip()
ret = m.group(3) ret = m.group(3)
if not ret:
if not ret: print(f" Missing return type: {name}({args})")
print(f" Missing return type: {name}({args})") # Check arguments
if args:
# Check arguments arg_list = [a.strip() for a in args.split(',')]
if args: for arg in arg_list:
arg_list = [a.strip() for a in args.split(',')] if not arg or arg == 'self' or arg == 'cls':
for arg in arg_list: continue
if not arg or arg == 'self' or arg == 'cls': if ':' not in arg and '=' not in arg:
continue print(f" Missing arg type: {name} -> {arg}")
if ':' not in arg and '=' not in arg: elif ':' not in arg and '=' in arg:
print(f" Missing arg type: {name} -> {arg}") # arg=val (missing type)
elif ':' not in arg and '=' in arg: print(f" Missing arg type: {name} -> {arg}")
# arg=val (missing type)
print(f" Missing arg type: {name} -> {arg}")

View File

@@ -7,151 +7,131 @@ import os
# This helps in cases where the script is run from different directories # This helps in cases where the script is run from different directories
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if project_root not in sys.path: if project_root not in sys.path:
sys.path.append(project_root) sys.path.append(project_root)
try: try:
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
except ImportError: except ImportError:
# Fallback if the script is run from the project root directly, # Fallback if the script is run from the project root directly,
# or if the above path append didn't work for some reason. # or if the above path append didn't work for some reason.
try: try:
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
except ImportError: except ImportError:
# Use basic print for fatal errors if logging isn't set up yet # Use basic print for fatal errors if logging isn't set up yet
print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr) print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr)
sys.exit(1) # Exit if the core dependency cannot be imported sys.exit(1) # Exit if the core dependency cannot be imported
def main(): def main():
# Setup basic logging to stderr. # Setup basic logging to stderr.
# Set level to DEBUG to capture all messages, including debug info. # Set level to DEBUG to capture all messages, including debug info.
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr) logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr)
logging.debug("CLI Tool Bridge script started.")
logging.debug("CLI Tool Bridge script started.") try:
# 1. Read JSON from sys.stdin
try: input_data = sys.stdin.read()
# 1. Read JSON from sys.stdin if not input_data:
input_data = sys.stdin.read() logging.debug("No input received from stdin. Exiting gracefully.")
return
if not input_data: logging.debug(f"Received raw input data: {input_data}")
logging.debug("No input received from stdin. Exiting gracefully.") try:
return hook_input = json.loads(input_data)
except json.JSONDecodeError:
logging.debug(f"Received raw input data: {input_data}") logging.error("Failed to decode JSON from stdin.")
print(json.dumps({
try: "decision": "deny",
hook_input = json.loads(input_data) "reason": "Invalid JSON received from stdin."
except json.JSONDecodeError: }))
logging.error("Failed to decode JSON from stdin.") return
print(json.dumps({ # Initialize variables for tool name and arguments
"decision": "deny", tool_name = None
"reason": "Invalid JSON received from stdin." tool_args = {}
})) # 2. Try to parse input in Gemini API format ('name', 'input')
return logging.debug("Attempting to parse input in Gemini API format ('name', 'input').")
if 'name' in hook_input and hook_input['name'] is not None:
# Initialize variables for tool name and arguments tool_name = hook_input['name']
tool_name = None logging.debug(f"Found Gemini API format tool name: {tool_name}")
tool_args = {} if 'input' in hook_input and hook_input['input'] is not None:
if isinstance(hook_input['input'], dict):
# 2. Try to parse input in Gemini API format ('name', 'input') tool_args = hook_input['input']
logging.debug("Attempting to parse input in Gemini API format ('name', 'input').") logging.debug(f"Found Gemini API format tool input: {tool_args}")
if 'name' in hook_input and hook_input['name'] is not None: else:
tool_name = hook_input['name'] logging.warning("Gemini API format 'input' is not a dictionary. Ignoring.")
logging.debug(f"Found Gemini API format tool name: {tool_name}") # 3. If Gemini format wasn't fully present, try the legacy format ('tool_name', 'tool_input')
if tool_name is None:
if 'input' in hook_input and hook_input['input'] is not None: logging.debug("Gemini API format not fully detected. Falling back to legacy format ('tool_name', 'tool_input').")
if isinstance(hook_input['input'], dict): tool_name = hook_input.get('tool_name')
tool_args = hook_input['input'] if tool_name:
logging.debug(f"Found Gemini API format tool input: {tool_args}") logging.debug(f"Found legacy format tool name: {tool_name}")
else: tool_input_legacy = hook_input.get('tool_input')
logging.warning("Gemini API format 'input' is not a dictionary. Ignoring.") if tool_input_legacy is not None:
if isinstance(tool_input_legacy, dict):
# 3. If Gemini format wasn't fully present, try the legacy format ('tool_name', 'tool_input') tool_args = tool_input_legacy
if tool_name is None: logging.debug(f"Found legacy format tool input: {tool_args}")
logging.debug("Gemini API format not fully detected. Falling back to legacy format ('tool_name', 'tool_input').") else:
tool_name = hook_input.get('tool_name') logging.warning("Legacy format 'tool_input' is not a dictionary. Ignoring.")
if tool_name: # Final checks on resolved tool_name and tool_args
logging.debug(f"Found legacy format tool name: {tool_name}") if tool_name is None:
logging.error("Could not determine tool name from input.")
tool_input_legacy = hook_input.get('tool_input') print(json.dumps({
if tool_input_legacy is not None: "decision": "deny",
if isinstance(tool_input_legacy, dict): "reason": "Could not determine tool name from input. Expected 'name' or 'tool_name'."
tool_args = tool_input_legacy }))
logging.debug(f"Found legacy format tool input: {tool_args}") return
else: if not isinstance(tool_args, dict):
logging.warning("Legacy format 'tool_input' is not a dictionary. Ignoring.") logging.error(f"Resolved tool_args is not a dictionary: {tool_args}")
print(json.dumps({
# Final checks on resolved tool_name and tool_args "decision": "deny",
if tool_name is None: "reason": "Resolved tool arguments are not in a valid dictionary format."
logging.error("Could not determine tool name from input.") }))
print(json.dumps({ return
"decision": "deny", logging.debug(f"Resolved tool_name: '{tool_name}', tool_args: {tool_args}")
"reason": "Could not determine tool name from input. Expected 'name' or 'tool_name'." # 4. Check context — if not running via Manual Slop, we pass through (allow)
})) # This prevents the hook from affecting normal CLI usage.
return hook_context = os.environ.get("GEMINI_CLI_HOOK_CONTEXT")
logging.debug(f"Checking GEMINI_CLI_HOOK_CONTEXT: '{hook_context}'")
if not isinstance(tool_args, dict): if hook_context != "manual_slop":
logging.error(f"Resolved tool_args is not a dictionary: {tool_args}") logging.debug(f"GEMINI_CLI_HOOK_CONTEXT is '{hook_context}', NOT 'manual_slop'. Allowing execution without confirmation.")
print(json.dumps({ print(json.dumps({
"decision": "deny", "decision": "allow",
"reason": "Resolved tool arguments are not in a valid dictionary format." "reason": f"Non-programmatic usage (GEMINI_CLI_HOOK_CONTEXT={hook_context})."
})) }))
return return
# 5. Use 'ApiHookClient' (assuming GUI is on http://127.0.0.1:8999)
logging.debug(f"Resolved tool_name: '{tool_name}', tool_args: {tool_args}") logging.debug("GEMINI_CLI_HOOK_CONTEXT is 'manual_slop'. Proceeding with API Hook Client.")
client = ApiHookClient(base_url="http://127.0.0.1:8999")
# 4. Check context — if not running via Manual Slop, we pass through (allow) try:
# This prevents the hook from affecting normal CLI usage. # 6. Request confirmation
hook_context = os.environ.get("GEMINI_CLI_HOOK_CONTEXT") # This is a blocking call that waits for the user in the GUI
logging.debug(f"Checking GEMINI_CLI_HOOK_CONTEXT: '{hook_context}'") logging.debug(f"Requesting confirmation for tool '{tool_name}' with args: {tool_args}")
if hook_context != "manual_slop": response = client.request_confirmation(tool_name, tool_args)
logging.debug(f"GEMINI_CLI_HOOK_CONTEXT is '{hook_context}', NOT 'manual_slop'. Allowing execution without confirmation.") if response and response.get('approved') is True:
print(json.dumps({ # 7. Print 'allow' decision
"decision": "allow", logging.debug("User approved tool execution.")
"reason": f"Non-programmatic usage (GEMINI_CLI_HOOK_CONTEXT={hook_context})." print(json.dumps({"decision": "allow"}))
})) else:
return # 8. Print 'deny' decision
reason = response.get('reason', 'User rejected tool execution in GUI.') if response else 'No response from GUI.'
# 5. Use 'ApiHookClient' (assuming GUI is on http://127.0.0.1:8999) logging.debug(f"User denied tool execution. Reason: {reason}")
logging.debug("GEMINI_CLI_HOOK_CONTEXT is 'manual_slop'. Proceeding with API Hook Client.") print(json.dumps({
client = ApiHookClient(base_url="http://127.0.0.1:8999") "decision": "deny",
"reason": reason
try: }))
# 6. Request confirmation except Exception as e:
# This is a blocking call that waits for the user in the GUI # 9. Handle cases where hook server is not reachable or other API errors
logging.debug(f"Requesting confirmation for tool '{tool_name}' with args: {tool_args}") # If we ARE in manual_slop context but can't reach the server, we should DENY
response = client.request_confirmation(tool_name, tool_args) # because the user expects to be in control.
logging.error(f"API Hook Client error: {str(e)}", exc_info=True)
if response and response.get('approved') is True: print(json.dumps({
# 7. Print 'allow' decision "decision": "deny",
logging.debug("User approved tool execution.") "reason": f"Manual Slop hook server unreachable or API error: {str(e)}"
print(json.dumps({"decision": "allow"})) }))
else: except Exception as e:
# 8. Print 'deny' decision # Fallback for unexpected errors during initial processing (e.g., stdin read)
reason = response.get('reason', 'User rejected tool execution in GUI.') if response else 'No response from GUI.' logging.error(f"An unexpected error occurred in the main bridge logic: {str(e)}", exc_info=True)
logging.debug(f"User denied tool execution. Reason: {reason}") print(json.dumps({
print(json.dumps({ "decision": "deny",
"decision": "deny", "reason": f"Internal bridge error: {str(e)}"
"reason": reason }))
}))
except Exception as e:
# 9. Handle cases where hook server is not reachable or other API errors
# If we ARE in manual_slop context but can't reach the server, we should DENY
# because the user expects to be in control.
logging.error(f"API Hook Client error: {str(e)}", exc_info=True)
print(json.dumps({
"decision": "deny",
"reason": f"Manual Slop hook server unreachable or API error: {str(e)}"
}))
except Exception as e:
# Fallback for unexpected errors during initial processing (e.g., stdin read)
logging.error(f"An unexpected error occurred in the main bridge logic: {str(e)}", exc_info=True)
print(json.dumps({
"decision": "deny",
"reason": f"Internal bridge error: {str(e)}"
}))
if __name__ == "__main__": if __name__ == "__main__":
main() main()

289
scripts/inject_tools.py Normal file
View File

@@ -0,0 +1,289 @@
import os
import re
with open('mcp_client.py', 'r', encoding='utf-8') as f:
content = f.read()
# 1. Add import os if not there
if 'import os' not in content:
content = content.replace('import summarize', 'import os\nimport summarize')
# 2. Add the functions before "# ------------------------------------------------------------------ web tools"
functions_code = r'''
def py_find_usages(path: str, name: str) -> str:
"""Finds exact string matches of a symbol in a given file or directory."""
p, err = _resolve_and_check(path)
if err: return err
try:
import re
pattern = re.compile(r"\b" + re.escape(name) + r"\b")
results = []
def _search_file(fp):
if fp.name == "history.toml" or fp.name.endswith("_history.toml"): return
if not _is_allowed(fp): return
try:
text = fp.read_text(encoding="utf-8")
lines = text.splitlines()
for i, line in enumerate(lines, 1):
if pattern.search(line):
rel = fp.relative_to(_primary_base_dir if _primary_base_dir else Path.cwd())
results.append(f"{rel}:{i}: {line.strip()[:100]}")
except Exception:
pass
if p.is_file():
_search_file(p)
else:
for root, dirs, files in os.walk(p):
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')]
for file in files:
if file.endswith(('.py', '.md', '.toml', '.txt', '.json')):
_search_file(Path(root) / file)
if not results:
return f"No usages found for '{name}' in {p}"
if len(results) > 100:
return "\n".join(results[:100]) + f"\n... (and {len(results)-100} more)"
return "\n".join(results)
except Exception as e:
return f"ERROR finding usages for '{name}': {e}"
def py_get_imports(path: str) -> str:
"""Parses a file's AST and returns a strict list of its dependencies."""
p, err = _resolve_and_check(path)
if err: return err
if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}"
try:
import ast
code = p.read_text(encoding="utf-8")
tree = ast.parse(code)
imports = []
for node in tree.body:
if isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
for alias in node.names:
imports.append(f"{module}.{alias.name}" if module else alias.name)
if not imports: return "No imports found."
return "Imports:\n" + "\n".join(f" - {i}" for i in imports)
except Exception as e:
return f"ERROR getting imports for '{path}': {e}"
def py_check_syntax(path: str) -> str:
"""Runs a quick syntax check on a Python file."""
p, err = _resolve_and_check(path)
if err: return err
if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}"
try:
import ast
code = p.read_text(encoding="utf-8")
ast.parse(code)
return f"Syntax OK: {path}"
except SyntaxError as e:
return f"SyntaxError in {path} at line {e.lineno}, offset {e.offset}: {e.msg}\n{e.text}"
except Exception as e:
return f"ERROR checking syntax for '{path}': {e}"
def py_get_hierarchy(path: str, class_name: str) -> str:
"""Scans the project to find subclasses of a given class."""
p, err = _resolve_and_check(path)
if err: return err
import ast
subclasses = []
def _search_file(fp):
if not _is_allowed(fp): return
try:
code = fp.read_text(encoding="utf-8")
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
for base in node.bases:
if isinstance(base, ast.Name) and base.id == class_name:
subclasses.append(f"{fp.name}: class {node.name}({class_name})")
elif isinstance(base, ast.Attribute) and base.attr == class_name:
subclasses.append(f"{fp.name}: class {node.name}({base.value.id}.{class_name})")
except Exception:
pass
try:
if p.is_file():
_search_file(p)
else:
for root, dirs, files in os.walk(p):
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')]
for file in files:
if file.endswith('.py'):
_search_file(Path(root) / file)
if not subclasses:
return f"No subclasses of '{class_name}' found in {p}"
return f"Subclasses of '{class_name}':\n" + "\n".join(f" - {s}" for s in subclasses)
except Exception as e:
return f"ERROR finding subclasses of '{class_name}': {e}"
def py_get_docstring(path: str, name: str) -> str:
"""Extracts the docstring for a specific module, class, or function."""
p, err = _resolve_and_check(path)
if err: return err
if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}"
try:
import ast
code = p.read_text(encoding="utf-8")
tree = ast.parse(code)
if not name or name == "module":
doc = ast.get_docstring(tree)
return doc if doc else "No module docstring found."
node = _get_symbol_node(tree, name)
if not node: return f"ERROR: could not find symbol '{name}' in {path}"
doc = ast.get_docstring(node)
return doc if doc else f"No docstring found for '{name}'."
except Exception as e:
return f"ERROR getting docstring for '{name}': {e}"
def get_tree(path: str, max_depth: int = 2) -> str:
"""Returns a directory structure up to a max depth."""
p, err = _resolve_and_check(path)
if err: return err
if not p.is_dir(): return f"ERROR: not a directory: {path}"
try:
max_depth = int(max_depth)
def _build_tree(dir_path, current_depth, prefix=""):
if current_depth > max_depth: return []
lines = []
try:
entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
except PermissionError:
return []
# Filter
entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")]
for i, entry in enumerate(entries):
is_last = (i == len(entries) - 1)
connector = "└── " if is_last else "├── "
lines.append(f"{prefix}{connector}{entry.name}")
if entry.is_dir():
extension = " " if is_last else ""
lines.extend(_build_tree(entry, current_depth + 1, prefix + extension))
return lines
tree_lines = [f"{p.name}/"] + _build_tree(p, 1)
return "\n".join(tree_lines)
except Exception as e:
return f"ERROR generating tree for '{path}': {e}"
# ------------------------------------------------------------------ web tools'''
content = content.replace('# ------------------------------------------------------------------ web tools', functions_code)
# 3. Update TOOL_NAMES
old_tool_names_match = re.search(r'TOOL_NAMES\s*=\s*\{([^}]*)\}', content)
if old_tool_names_match:
old_names = old_tool_names_match.group(1)
new_names = old_names + ', "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"'
content = content.replace(old_tool_names_match.group(0), f'TOOL_NAMES = {{{new_names}}}')
# 4. Update dispatch
dispatch_additions = r'''
if tool_name == "py_find_usages":
return py_find_usages(tool_input.get("path", ""), tool_input.get("name", ""))
if tool_name == "py_get_imports":
return py_get_imports(tool_input.get("path", ""))
if tool_name == "py_check_syntax":
return py_check_syntax(tool_input.get("path", ""))
if tool_name == "py_get_hierarchy":
return py_get_hierarchy(tool_input.get("path", ""), tool_input.get("class_name", ""))
if tool_name == "py_get_docstring":
return py_get_docstring(tool_input.get("path", ""), tool_input.get("name", ""))
if tool_name == "get_tree":
return get_tree(tool_input.get("path", ""), tool_input.get("max_depth", 2))
return f"ERROR: unknown MCP tool '{tool_name}'"
'''
content = re.sub(r' return f"ERROR: unknown MCP tool \'{tool_name}\'"', dispatch_additions.strip(), content)
# 5. Update MCP_TOOL_SPECS
mcp_tool_specs_addition = r'''
{
"name": "py_find_usages",
"description": "Finds exact string matches of a symbol in a given file or directory.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to file or directory to search." },
"name": { "type": "string", "description": "The symbol/string to search for." }
},
"required": ["path", "name"]
}
},
{
"name": "py_get_imports",
"description": "Parses a file's AST and returns a strict list of its dependencies.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to the .py file." }
},
"required": ["path"]
}
},
{
"name": "py_check_syntax",
"description": "Runs a quick syntax check on a Python file.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to the .py file." }
},
"required": ["path"]
}
},
{
"name": "py_get_hierarchy",
"description": "Scans the project to find subclasses of a given class.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Directory path to search in." },
"class_name": { "type": "string", "description": "Name of the base class." }
},
"required": ["path", "class_name"]
}
},
{
"name": "py_get_docstring",
"description": "Extracts the docstring for a specific module, class, or function.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to the .py file." },
"name": { "type": "string", "description": "Name of symbol or 'module' for the file docstring." }
},
"required": ["path", "name"]
}
},
{
"name": "get_tree",
"description": "Returns a directory structure up to a max depth.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Directory path." },
"max_depth": { "type": "integer", "description": "Maximum depth to recurse (default 2)." }
},
"required": ["path"]
}
}
]
'''
content = re.sub(r'\]\s*$', mcp_tool_specs_addition.strip(), content)
with open('mcp_client.py', 'w', encoding='utf-8') as f:
f.write(content)
print("Injected new tools.")

View File

@@ -11,290 +11,260 @@ import datetime
LOG_FILE = 'logs/mma_delegation.log' LOG_FILE = 'logs/mma_delegation.log'
def generate_skeleton(code: str) -> str: def generate_skeleton(code: str) -> str:
""" """
Parses Python code and replaces function/method bodies with '...', Parses Python code and replaces function/method bodies with '...',
preserving docstrings if present. preserving docstrings if present.
""" """
try: try:
PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language()) PY_LANGUAGE = tree_sitter.Language(tree_sitter_python.language())
parser = tree_sitter.Parser(PY_LANGUAGE) parser = tree_sitter.Parser(PY_LANGUAGE)
tree = parser.parse(bytes(code, "utf8")) tree = parser.parse(bytes(code, "utf8"))
edits = []
edits = [] def is_docstring(node):
if node.type == "expression_statement" and node.child_count > 0:
if node.children[0].type == "string":
return True
return False
def is_docstring(node): def walk(node):
if node.type == "expression_statement" and node.child_count > 0: if node.type == "function_definition":
if node.children[0].type == "string": body = node.child_by_field_name("body")
return True if body and body.type == "block":
return False indent = " " * body.start_point.column
first_stmt = None
def walk(node): for child in body.children:
if node.type == "function_definition": if child.type != "comment":
body = node.child_by_field_name("body") first_stmt = child
if body and body.type == "block": break
indent = " " * body.start_point.column if first_stmt and is_docstring(first_stmt):
first_stmt = None start_byte = first_stmt.end_byte
for child in body.children: end_byte = body.end_byte
if child.type != "comment": if end_byte > start_byte:
first_stmt = child edits.append((start_byte, end_byte, f"\n{indent}..."))
break else:
start_byte = body.start_byte
if first_stmt and is_docstring(first_stmt): end_byte = body.end_byte
start_byte = first_stmt.end_byte edits.append((start_byte, end_byte, "..."))
end_byte = body.end_byte for child in node.children:
if end_byte > start_byte: walk(child)
edits.append((start_byte, end_byte, f"\n{indent}...")) walk(tree.root_node)
else: edits.sort(key=lambda x: x[0], reverse=True)
start_byte = body.start_byte code_bytes = bytearray(code, "utf8")
end_byte = body.end_byte for start, end, replacement in edits:
edits.append((start_byte, end_byte, "...")) code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
for child in node.children: except Exception as e:
walk(child) return f"# Error generating skeleton: {e}\n{code}"
walk(tree.root_node)
edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8")
for start, end, replacement in edits:
code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
except Exception as e:
return f"# Error generating skeleton: {e}\n{code}"
def get_model_for_role(role: str) -> str: def get_model_for_role(role: str) -> str:
"""Returns the specific model to use for a given tier role.""" """Returns the specific model to use for a given tier role."""
if role == 'tier1-orchestrator' or role == 'tier1': if role == 'tier1-orchestrator' or role == 'tier1':
return 'gemini-3.1-pro-preview' return 'gemini-3.1-pro-preview'
elif role == 'tier2-tech-lead' or role == 'tier2': elif role == 'tier2-tech-lead' or role == 'tier2':
return 'gemini-3-flash-preview' return 'gemini-3-flash-preview'
elif role == 'tier3-worker' or role == 'tier3': elif role == 'tier3-worker' or role == 'tier3':
return 'gemini-3-flash-preview' return 'gemini-3-flash-preview'
elif role == 'tier4-qa' or role == 'tier4': elif role == 'tier4-qa' or role == 'tier4':
return 'gemini-2.5-flash-lite' return 'gemini-2.5-flash-lite'
else: else:
return 'gemini-2.5-flash-lite' return 'gemini-2.5-flash-lite'
def get_role_documents(role: str) -> list[str]: def get_role_documents(role: str) -> list[str]:
if role == 'tier1-orchestrator' or role == 'tier1': if role == 'tier1-orchestrator' or role == 'tier1':
return ['conductor/product.md', 'conductor/product-guidelines.md'] return ['conductor/product.md', 'conductor/product-guidelines.md']
elif role == 'tier2-tech-lead' or role == 'tier2': elif role == 'tier2-tech-lead' or role == 'tier2':
return ['conductor/tech-stack.md', 'conductor/workflow.md'] return ['conductor/tech-stack.md', 'conductor/workflow.md']
elif role == 'tier3-worker' or role == 'tier3': elif role == 'tier3-worker' or role == 'tier3':
return ['conductor/workflow.md'] return ['conductor/workflow.md']
return [] return []
def log_delegation(role, full_prompt, result=None, summary_prompt=None): def log_delegation(role, full_prompt, result=None, summary_prompt=None):
os.makedirs('logs/agents', exist_ok=True) os.makedirs('logs/agents', exist_ok=True)
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = f'logs/agents/mma_{role}_task_{timestamp}.log' log_file = f'logs/agents/mma_{role}_task_{timestamp}.log'
with open(log_file, 'w', encoding='utf-8') as f:
with open(log_file, 'w', encoding='utf-8') as f: f.write("==================================================\n")
f.write("==================================================\n") f.write(f"ROLE: {role}\n")
f.write(f"ROLE: {role}\n") f.write(f"TIMESTAMP: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"TIMESTAMP: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("--------------------------------------------------\n")
f.write("--------------------------------------------------\n") f.write(f"FULL PROMPT:\n{full_prompt}\n")
f.write(f"FULL PROMPT:\n{full_prompt}\n") f.write("--------------------------------------------------\n")
f.write("--------------------------------------------------\n") if result:
if result: f.write(f"RESULT:\n{result}\n")
f.write(f"RESULT:\n{result}\n") f.write("==================================================\n")
f.write("==================================================\n") # Also keep the master log
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
# Also keep the master log display_prompt = summary_prompt if summary_prompt else full_prompt
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) with open(LOG_FILE, 'a', encoding='utf-8') as f:
display_prompt = summary_prompt if summary_prompt else full_prompt f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n")
with open(LOG_FILE, 'a', encoding='utf-8') as f: return log_file
f.write(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {role}: {display_prompt[:100]}... (Log: {log_file})\n")
return log_file
def get_dependencies(filepath: str) -> list[str]: def get_dependencies(filepath: str) -> list[str]:
"""Identify top-level module imports from a Python file.""" """Identify top-level module imports from a Python file."""
try: try:
with open(filepath, 'r', encoding='utf-8') as f: with open(filepath, 'r', encoding='utf-8') as f:
tree = ast.parse(f.read()) tree = ast.parse(f.read())
dependencies = [] dependencies = []
for node in tree.body: for node in tree.body:
if isinstance(node, ast.Import): if isinstance(node, ast.Import):
for alias in node.names: for alias in node.names:
dependencies.append(alias.name.split('.')[0]) dependencies.append(alias.name.split('.')[0])
elif isinstance(node, ast.ImportFrom): elif isinstance(node, ast.ImportFrom):
if node.module: if node.module:
dependencies.append(node.module.split('.')[0]) dependencies.append(node.module.split('.')[0])
seen = set() seen = set()
result = [] result = []
for d in dependencies: for d in dependencies:
if d not in seen: if d not in seen:
result.append(d) result.append(d)
seen.add(d) seen.add(d)
return result return result
except Exception as e: except Exception as e:
print(f"Error getting dependencies for {filepath}: {e}") print(f"Error getting dependencies for {filepath}: {e}")
return [] return []
def execute_agent(role: str, prompt: str, docs: list[str]) -> str: def execute_agent(role: str, prompt: str, docs: list[str]) -> str:
model = get_model_for_role(role) model = get_model_for_role(role)
# Advanced Context: Dependency skeletons for Tier 3
# Advanced Context: Dependency skeletons for Tier 3 injected_context = ""
injected_context = "" # Whitelist of modules that sub-agents have "unfettered" (full) access to.
# Whitelist of modules that sub-agents have "unfettered" (full) access to. # These will be provided in full if imported, instead of just skeletons.
# These will be provided in full if imported, instead of just skeletons. UNFETTERED_MODULES = ['mcp_client', 'project_manager', 'events', 'aggregate']
UNFETTERED_MODULES = ['mcp_client', 'project_manager', 'events', 'aggregate'] if role in ['tier3', 'tier3-worker']:
for doc in docs:
if role in ['tier3', 'tier3-worker']: if doc.endswith('.py') and os.path.exists(doc):
for doc in docs: deps = get_dependencies(doc)
if doc.endswith('.py') and os.path.exists(doc): for dep in deps:
deps = get_dependencies(doc) # Only try to generate skeletons for files that exist in the local dir
for dep in deps: dep_file = f"{dep}.py"
# Only try to generate skeletons for files that exist in the local dir # Optimization: If the dependency is already in 'docs' (explicitly provided),
dep_file = f"{dep}.py" # do NOT inject its skeleton/full context again as a dependency.
if dep_file in docs:
# Optimization: If the dependency is already in 'docs' (explicitly provided), continue
# do NOT inject its skeleton/full context again as a dependency. if os.path.exists(dep_file) and dep_file != doc:
if dep_file in docs: try:
continue if dep in UNFETTERED_MODULES:
with open(dep_file, 'r', encoding='utf-8') as f:
if os.path.exists(dep_file) and dep_file != doc: full_content = f.read()
try: injected_context += f"\n\nFULL MODULE CONTEXT: {dep_file}\n{full_content}\n"
if dep in UNFETTERED_MODULES: else:
with open(dep_file, 'r', encoding='utf-8') as f: with open(dep_file, 'r', encoding='utf-8') as f:
full_content = f.read() skeleton = generate_skeleton(f.read())
injected_context += f"\n\nFULL MODULE CONTEXT: {dep_file}\n{full_content}\n" injected_context += f"\n\nDEPENDENCY SKELETON: {dep_file}\n{skeleton}\n"
else: except Exception as e:
with open(dep_file, 'r', encoding='utf-8') as f: print(f"Error gathering context for {dep_file}: {e}")
skeleton = generate_skeleton(f.read()) # Check for token-bloat safety: if injected_context is too large, truncate it
injected_context += f"\n\nDEPENDENCY SKELETON: {dep_file}\n{skeleton}\n" if len(injected_context) > 15000:
except Exception as e: injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]"
print(f"Error gathering context for {dep_file}: {e}") # MMA Protocol: Tier 3 and 4 are stateless.
if role in ['tier3', 'tier3-worker']:
# Check for token-bloat safety: if injected_context is too large, truncate it system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). " \
if len(injected_context) > 15000: "Your goal is to implement specific code changes or tests based on the provided task. " \
injected_context = injected_context[:15000] + "... [TRUNCATED FOR COMMAND LINE LIMITS]" "You have access to tools for reading and writing files (e.g., read_file, write_file, replace), " \
"codebase investigation (discovered_tool_py_get_code_outline, discovered_tool_py_get_skeleton, discovered_tool_py_find_usages, discovered_tool_py_get_imports, discovered_tool_py_check_syntax, discovered_tool_get_tree), " \
# MMA Protocol: Tier 3 and 4 are stateless. "version control (discovered_tool_get_git_diff), and web tools (discovered_tool_web_search, discovered_tool_fetch_url). " \
if role in ['tier3', 'tier3-worker']: "You CAN execute PowerShell scripts via discovered_tool_run_powershell for verification and testing. " \
system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 3 Worker (Contributor). " \ "Follow TDD and return success status or code changes. No pleasantries, no conversational filler."
"Your goal is to implement specific code changes or tests based on the provided task. " \ elif role in ['tier4', 'tier4-qa']:
"You have access to tools for reading and writing files (e.g., read_file, write_file, replace), " \ system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. " \
"codebase investigation (discovered_tool_get_code_outline, discovered_tool_get_python_skeleton), " \ "Your goal is to analyze errors, summarize logs, or verify tests. " \
"version control (discovered_tool_get_git_diff), and web tools (discovered_tool_web_search, discovered_tool_fetch_url). " \ "You have access to tools for reading files, exploring the codebase (discovered_tool_py_get_code_outline, discovered_tool_py_get_skeleton, discovered_tool_py_find_usages, discovered_tool_py_get_imports), " \
"You CAN execute PowerShell scripts via discovered_tool_run_powershell for verification and testing. " \ "version control (discovered_tool_get_git_diff), and web tools (discovered_tool_web_search, discovered_tool_fetch_url). " \
"Follow TDD and return success status or code changes. No pleasantries, no conversational filler." "You CAN execute PowerShell scripts via discovered_tool_run_powershell for diagnostics. " \
elif role in ['tier4', 'tier4-qa']: "ONLY output the requested analysis. No pleasantries."
system_directive = "STRICT SYSTEM DIRECTIVE: You are a stateless Tier 4 QA Agent. " \ else:
"Your goal is to analyze errors, summarize logs, or verify tests. " \ system_directive = f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " \
"You have access to tools for reading files, exploring the codebase (discovered_tool_get_code_outline, discovered_tool_get_python_skeleton), " \ "ONLY output the requested text. No pleasantries."
"version control (discovered_tool_get_git_diff), and web tools (discovered_tool_web_search, discovered_tool_fetch_url). " \ command_text = f"{system_directive}\n\n{injected_context}\n\n"
"You CAN execute PowerShell scripts via discovered_tool_run_powershell for diagnostics. " \ # Manually inline documents to ensure sub-agent has context in headless mode
"ONLY output the requested analysis. No pleasantries." for doc in docs:
else: if os.path.exists(doc):
system_directive = f"STRICT SYSTEM DIRECTIVE: You are a stateless {role}. " \ try:
"ONLY output the requested text. No pleasantries." with open(doc, 'r', encoding='utf-8') as f:
content = f.read()
command_text = f"{system_directive}\n\n{injected_context}\n\n" command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n"
except Exception as e:
# Manually inline documents to ensure sub-agent has context in headless mode print(f"Error inlining {doc}: {e}")
for doc in docs: command_text += f"\n\nTASK: {prompt}\n\n"
if os.path.exists(doc): # Use subprocess with input to pipe the prompt via stdin, avoiding WinError 206.
try: # We use -p 'mma_task' to ensure non-interactive (headless) mode and valid parsing.
with open(doc, 'r', encoding='utf-8') as f: # Whitelist tools to ensure they are available to the model in headless mode.
content = f.read() allowed_tools = "read_file,write_file,replace,list_directory,glob,grep_search,discovered_tool_search_files,discovered_tool_get_file_summary,discovered_tool_py_get_skeleton,discovered_tool_py_get_code_outline,discovered_tool_py_get_definition,discovered_tool_py_update_definition,discovered_tool_py_get_signature,discovered_tool_py_set_signature,discovered_tool_py_get_class_summary,discovered_tool_py_get_var_declaration,discovered_tool_py_set_var_declaration,discovered_tool_get_git_diff,discovered_tool_run_powershell,activate_skill,codebase_investigator,discovered_tool_web_search,discovered_tool_fetch_url,discovered_tool_py_find_usages,discovered_tool_py_get_imports,discovered_tool_py_check_syntax,discovered_tool_py_get_hierarchy,discovered_tool_py_get_docstring,discovered_tool_get_tree"
command_text += f"\n\nFILE CONTENT: {doc}\n{content}\n" ps_command = (
except Exception as e: f"if (Test-Path 'C:\\projects\\misc\\setup_gemini.ps1') {{ . 'C:\\projects\\misc\\setup_gemini.ps1' }}; "
print(f"Error inlining {doc}: {e}") f"gemini -p 'mma_task' --allowed-tools {allowed_tools} --output-format json --model {model}"
)
command_text += f"\n\nTASK: {prompt}\n\n" cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command]
try:
# Use subprocess with input to pipe the prompt via stdin, avoiding WinError 206. process = subprocess.run(cmd, input=command_text, capture_output=True, text=True, encoding='utf-8')
# We use -p 'mma_task' to ensure non-interactive (headless) mode and valid parsing. result = process.stdout
# Whitelist tools to ensure they are available to the model in headless mode. if not process.stdout and process.stderr:
allowed_tools = "read_file,write_file,replace,list_directory,glob,grep_search,discovered_tool_search_files,discovered_tool_get_file_summary,discovered_tool_get_python_skeleton,discovered_tool_get_code_outline,discovered_tool_get_git_diff,discovered_tool_run_powershell,activate_skill,codebase_investigator,discovered_tool_web_search,discovered_tool_fetch_url" result = f"Error: {process.stderr}"
ps_command = ( # Log the attempt and result
f"if (Test-Path 'C:\\projects\\misc\\setup_gemini.ps1') {{ . 'C:\\projects\\misc\\setup_gemini.ps1' }}; " log_file = log_delegation(role, command_text, result, summary_prompt=prompt)
f"gemini -p 'mma_task' --allowed-tools {allowed_tools} --output-format json --model {model}" print(f"Sub-agent log created: {log_file}")
) stdout = process.stdout
cmd = ['powershell.exe', '-NoProfile', '-Command', ps_command] start_index = stdout.find('{')
if start_index != -1:
json_str = stdout[start_index:]
try: try:
process = subprocess.run(cmd, input=command_text, capture_output=True, text=True, encoding='utf-8') data = json.loads(json_str)
return data.get('response', stdout)
result = process.stdout except json.JSONDecodeError:
if not process.stdout and process.stderr: return stdout
result = f"Error: {process.stderr}" return stdout
except Exception as e:
# Log the attempt and result err_msg = f"Execution failed: {str(e)}"
log_file = log_delegation(role, command_text, result, summary_prompt=prompt) log_delegation(role, command_text, err_msg)
print(f"Sub-agent log created: {log_file}") return err_msg
stdout = process.stdout
start_index = stdout.find('{')
if start_index != -1:
json_str = stdout[start_index:]
try:
data = json.loads(json_str)
return data.get('response', stdout)
except json.JSONDecodeError:
return stdout
return stdout
except Exception as e:
err_msg = f"Execution failed: {str(e)}"
log_delegation(role, command_text, err_msg)
return err_msg
def create_parser(): def create_parser():
parser = argparse.ArgumentParser(description="MMA Execution Script") parser = argparse.ArgumentParser(description="MMA Execution Script")
parser.add_argument( parser.add_argument(
"--role", "--role",
choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'], choices=['tier1', 'tier2', 'tier3', 'tier4', 'tier1-orchestrator', 'tier2-tech-lead', 'tier3-worker', 'tier4-qa'],
help="The tier role to execute" help="The tier role to execute"
) )
parser.add_argument( parser.add_argument(
"--task-file", "--task-file",
type=str, type=str,
help="TOML file defining the task" help="TOML file defining the task"
) )
parser.add_argument( parser.add_argument(
"prompt", "prompt",
type=str, type=str,
nargs='?', nargs='?',
help="The prompt for the tier (optional if --task-file is used)" help="The prompt for the tier (optional if --task-file is used)"
) )
return parser return parser
def main(): def main():
parser = create_parser() parser = create_parser()
args = parser.parse_args() args = parser.parse_args()
role = args.role
role = args.role prompt = args.prompt
prompt = args.prompt docs = []
docs = [] if args.task_file and os.path.exists(args.task_file):
with open(args.task_file, "rb") as f:
if args.task_file and os.path.exists(args.task_file): task_data = tomllib.load(f)
with open(args.task_file, "rb") as f: role = task_data.get("role", role)
task_data = tomllib.load(f) prompt = task_data.get("prompt", prompt)
role = task_data.get("role", role) docs = task_data.get("docs", [])
prompt = task_data.get("prompt", prompt) if not role or not prompt:
docs = task_data.get("docs", []) parser.print_help()
return
if not role or not prompt: if not docs:
parser.print_help() docs = get_role_documents(role)
return # Extract @file references from the prompt
import re
if not docs: file_refs = re.findall(r"@([\w./\\]+)", prompt)
docs = get_role_documents(role) for ref in file_refs:
if os.path.exists(ref) and ref not in docs:
# Extract @file references from the prompt docs.append(ref)
import re print(f"Executing role: {role} with docs: {docs}")
file_refs = re.findall(r"@([\w./\\]+)", prompt) result = execute_agent(role, prompt, docs)
for ref in file_refs: print(result)
if os.path.exists(ref) and ref not in docs:
docs.append(ref)
print(f"Executing role: {role} with docs: {docs}")
result = execute_agent(role, prompt, docs)
print(result)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

64
scripts/slice_tools.py Normal file
View File

@@ -0,0 +1,64 @@
import sys
import ast
def get_slice(filepath, start_line, end_line):
with open(filepath, 'r', encoding='utf-8') as f:
lines = f.readlines()
start_idx = int(start_line) - 1
end_idx = int(end_line)
return "".join(lines[start_idx:end_idx])
def set_slice(filepath, start_line, end_line, new_content):
with open(filepath, 'r', encoding='utf-8') as f:
lines = f.readlines()
start_idx = int(start_line) - 1
end_idx = int(end_line)
if new_content and not new_content.endswith(chr(10)):
new_content += chr(10)
new_lines = new_content.splitlines(True) if new_content else []
lines[start_idx:end_idx] = new_lines
with open(filepath, 'w', encoding='utf-8', newline='') as f:
f.writelines(lines)
def get_def(filepath, symbol_name):
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
if node.name == symbol_name:
start = node.lineno
end = node.end_lineno
if node.decorator_list:
start = node.decorator_list[0].lineno
slice_content = get_slice(filepath, start, end)
return f"{start},{end}{chr(10)}{slice_content}"
return "NOT_FOUND"
def set_def(filepath, symbol_name, new_content):
res = get_def(filepath, symbol_name)
if res == "NOT_FOUND":
print(f"Error: Symbol '{symbol_name}' not found in {filepath}")
sys.exit(1)
lines_info = res.split(chr(10), 1)[0]
start, end = lines_info.split(',')
set_slice(filepath, start, end, new_content)
print(f"Successfully updated '{symbol_name}' (lines {start}-{end}) in {filepath}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python slice_tools.py <command> [args...]")
sys.exit(1)
cmd = sys.argv[1]
if cmd == "get_slice":
print(get_slice(sys.argv[2], sys.argv[3], sys.argv[4]), end="")
elif cmd == "set_slice":
with open(sys.argv[5], 'r', encoding='utf-8') as f:
new_content = f.read()
set_slice(sys.argv[2], sys.argv[3], sys.argv[4], new_content)
elif cmd == "get_def":
print(get_def(sys.argv[2], sys.argv[3]), end="")
elif cmd == "set_def":
with open(sys.argv[4], 'r', encoding='utf-8') as f:
new_content = f.read()
set_def(sys.argv[2], sys.argv[3], new_content)

97
scripts/temp_def.py Normal file
View File

@@ -0,0 +1,97 @@
def format_code(source: str) -> str:
"""
Formats Python code to use exactly 1 space for indentation (including continuations),
max 1 blank line between top-level definitions, and 0 blank lines inside
function/method bodies.
Args:
source: The Python source code to format.
Returns:
The formatted source code.
"""
if not source:
return ""
tokens = list(tokenize.generate_tokens(io.StringIO(source).readline))
lines = source.splitlines(keepends=True)
num_lines = len(lines)
block_level = 0
paren_level = 0
in_function_stack = []
expecting_function_indent = False
line_indent = {}
line_is_blank = {i: True for i in range(1, num_lines + 2)}
line_is_string_interior = {i: False for i in range(1, num_lines + 2)}
line_seen = set()
pending_blank_lines = []
for tok in tokens:
t_type = tok.type
t_string = tok.string
start_line, _ = tok.start
end_line, _ = tok.end
if t_type == tokenize.STRING:
for l in range(start_line + 1, end_line + 1):
line_is_string_interior[l] = True
if t_type not in (tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, tokenize.DEDENT, tokenize.ENDMARKER):
for l in range(start_line, end_line + 1):
line_is_blank[l] = False
pending_blank_lines = [] # Real content seen, clear pending blanks
# State updates that affect CURRENT line
if t_type == tokenize.INDENT:
block_level += 1
if expecting_function_indent:
in_function_stack.append(block_level)
expecting_function_indent = False
elif t_type == tokenize.DEDENT:
block_level -= 1
if in_function_stack and block_level < in_function_stack[-1]:
in_function_stack.pop()
# Retroactively update pending blank lines to the current (outer) level
for l in pending_blank_lines:
line_indent[l] = block_level + paren_level
if t_string in (')', ']', '}'):
paren_level -= 1
if start_line not in line_seen:
line_indent[start_line] = block_level + paren_level
if t_type not in (tokenize.INDENT, tokenize.DEDENT):
line_seen.add(start_line)
if t_type in (tokenize.NL, tokenize.NEWLINE):
pending_blank_lines.append(start_line)
# State updates that affect FUTURE lines/tokens
if t_type == tokenize.NAME and t_string == 'def':
expecting_function_indent = True
if t_string in ('(', '[', '{'):
paren_level += 1
output = []
consecutive_blanks = 0
for i in range(1, num_lines + 1):
if line_is_string_interior[i]:
output.append(lines[i-1])
continue
if line_is_blank[i]:
indent = line_indent.get(i, 0)
if indent > 0:
continue
else:
if consecutive_blanks < 1:
output.append("\n")
consecutive_blanks += 1
continue
original_line = lines[i-1]
indent = line_indent.get(i, 0)
stripped = original_line.lstrip()
# Enforce a 1-line gap before definitions/classes
is_def_start = stripped.startswith(('def ', 'class ', 'async def ', '@'))
if is_def_start and output and consecutive_blanks == 0:
prev_line = output[-1].strip()
# Don't add a gap if immediately following a block opener or another decorator
if prev_line and not prev_line.endswith(':') and not prev_line.startswith('@'):
output.append("\n")
consecutive_blanks += 1
consecutive_blanks = 0
output.append(" " * indent + stripped)
if not stripped.endswith('\n') and i < num_lines:
output[-1] += '\n'
if output and not output[-1].endswith('\n'):
output[-1] += '\n'
return "".join(output)

View File

@@ -11,44 +11,40 @@ sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
try: try:
import mcp_client import mcp_client
import shell_runner import shell_runner
except ImportError: except ImportError:
print(json.dumps({"error": "Failed to import required modules"})) print(json.dumps({"error": "Failed to import required modules"}))
sys.exit(1) sys.exit(1)
def main(): def main():
if len(sys.argv) < 2: if len(sys.argv) < 2:
print(json.dumps({"error": "No tool name provided"})) print(json.dumps({"error": "No tool name provided"}))
sys.exit(1) sys.exit(1)
tool_name = sys.argv[1]
tool_name = sys.argv[1] # Read arguments from stdin
try:
# Read arguments from stdin input_data = sys.stdin.read()
try: if input_data:
input_data = sys.stdin.read() tool_input = json.loads(input_data)
if input_data: else:
tool_input = json.loads(input_data) tool_input = {}
else: except json.JSONDecodeError:
tool_input = {} print(json.dumps({"error": "Invalid JSON input"}))
except json.JSONDecodeError: sys.exit(1)
print(json.dumps({"error": "Invalid JSON input"})) try:
sys.exit(1) if tool_name == "run_powershell":
script = tool_input.get("script", "")
try: # We use the current directory as base_dir for CLI calls
if tool_name == "run_powershell": result = shell_runner.run_powershell(script, os.getcwd())
script = tool_input.get("script", "") else:
# We use the current directory as base_dir for CLI calls # mcp_client tools generally resolve paths relative to CWD if not configured.
result = shell_runner.run_powershell(script, os.getcwd()) result = mcp_client.dispatch(tool_name, tool_input)
else: # We print the raw result string as that's what gemini-cli expects.
# mcp_client tools generally resolve paths relative to CWD if not configured. print(result)
result = mcp_client.dispatch(tool_name, tool_input) except Exception as e:
print(f"ERROR executing tool {tool_name}: {e}")
# We print the raw result string as that's what gemini-cli expects. sys.exit(1)
print(result)
except Exception as e:
print(f"ERROR executing tool {tool_name}: {e}")
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -6,44 +6,41 @@ import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
try: try:
import mcp_client import mcp_client
except ImportError as e: except ImportError as e:
# Print the error to stderr to diagnose # Print the error to stderr to diagnose
print(f"ImportError in discovery: {e}", file=sys.stderr) print(f"ImportError in discovery: {e}", file=sys.stderr)
print("[]") print("[]")
sys.exit(0) sys.exit(0)
def main(): def main():
specs = list(mcp_client.MCP_TOOL_SPECS) specs = list(mcp_client.MCP_TOOL_SPECS)
# Add run_powershell (manually define to match ai_client.py)
# Add run_powershell (manually define to match ai_client.py) specs.append({
specs.append({ "name": "run_powershell",
"name": "run_powershell", "description": (
"description": ( "Run a PowerShell script within the project base_dir. "
"Run a PowerShell script within the project base_dir. " "Use this to create, edit, rename, or delete files and directories. "
"Use this to create, edit, rename, or delete files and directories. " "The working directory is set to base_dir automatically. "
"The working directory is set to base_dir automatically. " "stdout and stderr are returned to you as the result."
"stdout and stderr are returned to you as the result." ),
), "parameters": {
"parameters": { "type": "object",
"type": "object", "properties": {
"properties": { "script": {
"script": { "type": "string",
"type": "string", "description": "The PowerShell script to execute."
"description": "The PowerShell script to execute." }
} },
}, "required": ["script"]
"required": ["script"] }
} })
}) # Rename 'parameters' to 'parametersJsonSchema' for Gemini CLI
for spec in specs:
# Rename 'parameters' to 'parametersJsonSchema' for Gemini CLI if "parameters" in spec:
for spec in specs: spec["parametersJsonSchema"] = spec.pop("parameters")
if "parameters" in spec: # Output as JSON array of FunctionDeclarations
spec["parametersJsonSchema"] = spec.pop("parameters") print(json.dumps(specs, indent=2))
# Output as JSON array of FunctionDeclarations
print(json.dumps(specs, indent=2))
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -4,7 +4,6 @@ import sys
def get_missing_hints(file_path: str): def get_missing_hints(file_path: str):
with open(file_path, "r", encoding="utf-8") as f: with open(file_path, "r", encoding="utf-8") as f:
tree = ast.parse(f.read()) tree = ast.parse(f.read())
missing = [] missing = []
for node in ast.walk(tree): for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
@@ -15,7 +14,7 @@ def get_missing_hints(file_path: str):
"name": node.name, "name": node.name,
"lineno": node.lineno, "lineno": node.lineno,
"col_offset": node.col_offset "col_offset": node.col_offset
}) })
return missing return missing
if __name__ == "__main__": if __name__ == "__main__":