Files
manual_slop/src/mcp_client.py

1604 lines
52 KiB
Python

# mcp_client.py
"""
MCP Client - Multi-tool filesystem and network operations with sandboxing.
This module implements a Model Context Protocol (MCP)-like interface for AI
agents to interact with the filesystem and network. It provides 26 tools
with a three-layer security model to prevent unauthorized access.
Three-Layer Security Model:
1. Allowlist Construction (configure()):
- Builds _allowed_paths from project file_items
- Populates _base_dirs from file parents and extra_base_dirs
- Sets _primary_base_dir for relative path resolution
2. Path Validation (_is_allowed()):
- Blacklist check: history.toml, *_history.toml, config, credentials
- Explicit allowlist check: _allowed_paths membership
- CWD fallback: allows cwd() subpaths if no base_dirs configured
- Base directory containment: must be subpath of _base_dirs
3. Resolution Gate (_resolve_and_check()):
- Converts relative paths using _primary_base_dir
- Resolves symlinks to prevent traversal attacks
- Returns (resolved_path, error_message) tuple
Tool Categories:
- File I/O: read_file, list_directory, search_files, get_tree
- Surgical Edits: set_file_slice, edit_file
- AST-Based (Python): py_get_skeleton, py_get_code_outline, py_get_definition,
py_update_definition, py_get_signature, py_set_signature, py_get_class_summary,
py_get_var_declaration, py_set_var_declaration
- Analysis: get_file_summary, get_git_diff, py_find_usages, py_get_imports,
py_check_syntax, py_get_hierarchy, py_get_docstring
- Network: web_search, fetch_url
- Runtime: get_ui_performance
Mutating Tools:
The MUTATING_TOOLS frozenset defines tools that modify files. ai_client.py
checks this set and routes to pre_tool_callback (GUI approval) if present.
Thread Safety:
This module uses module-level global state (_allowed_paths, _base_dirs).
Call configure() before dispatch() in multi-threaded environments.
See Also:
- docs/guide_tools.md for complete tool inventory and security model
- src/ai_client.py for tool dispatch integration
- src/shell_runner.py for PowerShell execution
"""
# mcp_client.py
#MCP-style file context tools for manual_slop.
from __future__ import annotations
import asyncio
import json
from src import models
from pathlib import Path
from typing import Optional, Callable, Any, cast
import os
import ast
import subprocess
from src import summarize
from src import outline_tool
import urllib.request
import urllib.parse
from html.parser import HTMLParser
import re as _re
# ------------------------------------------------------------------ mutating tools sentinel
# Tools that write or modify files. ai_client checks this set before dispatch
# and routes to pre_tool_callback (GUI approval) if the tool name is present.
MUTATING_TOOLS: frozenset[str] = frozenset({
"set_file_slice",
"py_update_definition",
"py_set_signature",
"py_set_var_declaration",
"edit_file",
})
# ------------------------------------------------------------------ state
# Set by configure() before the AI send loop starts.
# allowed_paths : set of resolved absolute Path objects (files or dirs)
# base_dirs : set of resolved absolute Path dirs that act as roots
_allowed_paths: set[Path] = set()
_base_dirs: set[Path] = set()
_primary_base_dir: Path | None = None
# Injected by gui_2.py - returns a dict of performance metrics
perf_monitor_callback: Optional[Callable[[], dict[str, Any]]] = None
def configure(file_items: list[dict[str, Any]], extra_base_dirs: list[str] | None = None) -> None:
"""
Build the allowlist from aggregate file_items.
Called by ai_client before each send so the list reflects the current project.
file_items : list of dicts from aggregate.build_file_items()
extra_base_dirs : additional directory roots to allow traversal of
"""
global _allowed_paths, _base_dirs, _primary_base_dir
_allowed_paths = set()
_base_dirs = set()
_primary_base_dir = Path(extra_base_dirs[0]).resolve() if extra_base_dirs else Path.cwd()
for item in file_items:
p = item.get("path")
if p is not None:
try:
rp = Path(p).resolve(strict=True)
except (OSError, ValueError):
rp = Path(p).resolve()
_allowed_paths.add(rp)
_base_dirs.add(rp.parent)
if extra_base_dirs:
for d in extra_base_dirs:
dp = Path(d).resolve()
if dp.is_dir():
_base_dirs.add(dp)
def _is_allowed(path: Path) -> bool:
"""
Return True if `path` is within the allowlist.
A path is allowed if:
- it is explicitly in _allowed_paths, OR
- it is contained within (or equal to) one of the _base_dirs
All paths are resolved (follows symlinks) before comparison to prevent
symlink-based path traversal.
CRITICAL: Blacklisted files (history) are NEVER allowed.
"""
from src.paths import get_config_path
from src.ai_client import get_credentials_path
try:
rp = path.resolve(strict=True)
except (OSError, ValueError):
rp = path.resolve()
# Blacklist check by resolved path
if rp == get_config_path().resolve():
return False
if rp == get_credentials_path().resolve():
return False
name = path.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
return False
if rp in _allowed_paths:
return True
# Allow current working directory and subpaths by default if no base_dirs
cwd = Path.cwd().resolve()
if not _base_dirs:
try:
rp.relative_to(cwd)
return True
except ValueError:
pass
for bd in _base_dirs:
try:
rp.relative_to(bd)
return True
except ValueError:
continue
return False
def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
"""
Resolve raw_path and verify it passes the allowlist check.
Returns (resolved_path, error_string). error_string is empty on success.
"""
try:
p = Path(raw_path)
if not p.is_absolute() and _primary_base_dir:
p = _primary_base_dir / p
p = p.resolve()
except Exception as e:
return None, f"ERROR: invalid path '{raw_path}': {e}"
if not _is_allowed(p):
allowed_bases = "\\n".join([f" - {d}" for d in _base_dirs])
return None, (
f"ACCESS DENIED: '{raw_path}' resolves to '{p}', which is not within the allowed paths.\\n"
f"Allowed base directories are:\\n{allowed_bases}"
)
return p, ""
# ------------------------------------------------------------------ tool implementations
def read_file(path: str) -> str:
"""Return the UTF-8 content of a file, or an error string."""
p, err = _resolve_and_check(path)
if err or p is None:
return err
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
return p.read_text(encoding="utf-8")
except Exception as e:
return f"ERROR reading '{path}': {e}"
def list_directory(path: str) -> str:
"""List entries in a directory. Returns a compact text table."""
p, err = _resolve_and_check(path)
if err or p is None:
return err
if not p.exists():
return f"ERROR: path not found: {path}"
if not p.is_dir():
return f"ERROR: not a directory: {path}"
try:
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = [f"Directory: {p}", ""]
count = 0
for entry in entries:
# Blacklist check
name = entry.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
kind = "file" if entry.is_file() else "dir "
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
lines.append(f" [{kind}] {entry.name:<40} {size}")
count += 1
lines.append(f" ({count} entries)")
return "\n".join(lines)
except Exception as e:
return f"ERROR listing '{path}': {e}"
def search_files(path: str, pattern: str) -> str:
"""
Search for files matching a glob pattern within path.
pattern examples: '*.py', '**/*.toml', 'src/**/*.rs'
"""
p, err = _resolve_and_check(path)
if err or p is None:
return err
if not p.is_dir():
return f"ERROR: not a directory: {path}"
try:
matches = sorted(p.glob(pattern))
if not matches:
return f"No files matched '{pattern}' in {path}"
lines = [f"Search '{pattern}' in {p}:", ""]
count = 0
for m in matches:
# Blacklist check
name = m.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
rel = m.relative_to(p)
kind = "file" if m.is_file() else "dir "
lines.append(f" [{kind}] {rel}")
count += 1
lines.append(f" ({count} match(es))")
return "\n".join(lines)
except Exception as e:
return f"ERROR searching '{path}': {e}"
def get_file_summary(path: str) -> str:
"""
Return the heuristic summary for a file (same as the initial context block).
For .py files: imports, classes, methods, functions, constants.
For .toml: table keys. For .md: headings. Others: line count + preview.
"""
p, err = _resolve_and_check(path)
if err or p is None:
return err
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
content = p.read_text(encoding="utf-8")
return summarize.summarise_file(p, content)
except Exception as e:
return f"ERROR summarising '{path}': {e}"
def py_get_skeleton(path: str) -> str:
"""
Returns a skeleton of a Python file (preserving docstrings, stripping function bodies).
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file() or p.suffix != ".py":
return f"ERROR: not a python file: {path}"
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("python")
return parser.get_skeleton(code)
except Exception as e:
return f"ERROR generating skeleton for '{path}': {e}"
def py_get_code_outline(path: str) -> str:
"""
Returns a hierarchical outline of a code file (classes, functions, methods with line ranges).
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
code = p.read_text(encoding="utf-8")
return outline_tool.get_outline(p, code)
except Exception as e:
return f"ERROR generating outline for '{path}': {e}"
def get_file_slice(path: str, start_line: int, end_line: int) -> str:
"""Return a specific line range from a file."""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
try:
lines = p.read_text(encoding="utf-8").splitlines(keepends=True)
start_idx = start_line - 1
end_idx = end_line
return "".join(lines[start_idx:end_idx])
except Exception as e:
return f"ERROR reading slice from '{path}': {e}"
def set_file_slice(path: str, start_line: int, end_line: int, new_content: str) -> str:
"""Replace a specific line range in a file with new content."""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
try:
lines = p.read_text(encoding="utf-8").splitlines(keepends=True)
start_idx = start_line - 1
end_idx = end_line
if new_content and not new_content.endswith("\n"):
new_content += "\n"
new_lines = new_content.splitlines(keepends=True) if new_content else []
lines[start_idx:end_idx] = new_lines
p.write_text("".join(lines), encoding="utf-8")
return f"Successfully updated lines {start_line}-{end_line} in {path}"
except Exception as e:
return f"ERROR updating slice in '{path}': {e}"
def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str:
"""
Replace exact string match in a file. Preserves indentation and line endings.
Drop-in replacement for native edit tool that destroys 1-space indentation.
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not old_string:
return "ERROR: old_string cannot be empty"
try:
content = p.read_text(encoding="utf-8")
if old_string not in content:
return f"ERROR: old_string not found in '{path}'"
count = content.count(old_string)
if count > 1 and not replace_all:
return f"ERROR: Found {count} matches for old_string in '{path}'. Use replace_all=true or provide more context to make it unique."
if replace_all:
new_content = content.replace(old_string, new_string)
p.write_text(new_content, encoding="utf-8")
return f"Successfully replaced {count} occurrences in '{path}'"
else:
new_content = content.replace(old_string, new_string, 1)
p.write_text(new_content, encoding="utf-8")
return f"Successfully replaced 1 occurrence in '{path}'"
except Exception as e:
return f"ERROR editing '{path}': {e}"
def _get_symbol_node(tree: ast.AST, name: str) -> Optional[ast.AST]:
"""Helper to find an AST node by name (Class, Function, or Variable). Supports dot notation."""
parts = name.split(".")
def find_in_scope(scope_node: Any, target_name: str) -> Optional[ast.AST]:
# scope_node could be Module, ClassDef, or FunctionDef
body = getattr(scope_node, "body", [])
for node in body:
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == target_name:
return node
if isinstance(node, ast.Assign):
for t in node.targets:
if isinstance(t, ast.Name) and t.id == target_name:
return node
if isinstance(node, ast.AnnAssign):
if isinstance(node.target, ast.Name) and node.target.id == target_name:
return node
return None
current = tree
for part in parts:
found = find_in_scope(current, part)
if not found:
return None
current = found
return current
def py_get_symbol_info(path: str, name: str) -> tuple[str, int] | str:
"""
Returns (source_code, line_number) for a specific class, function, or method definition.
If not found, returns an error string.
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines(keepends=True)
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if node:
start = cast(int, getattr(node, "lineno"))
end = cast(int, getattr(node, "end_lineno"))
return ("".join(lines[start-1:end]), start)
return f"ERROR: definition '{name}' not found in {path}"
except Exception as e:
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
def py_get_definition(path: str, name: str) -> str:
"""
Returns the source code for a specific class, function, or method definition.
path: Path to the code file.
name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method').
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
if p.suffix != ".py":
return f"ERROR: py_get_definition currently only supports .py files (unsupported: {p.suffix})"
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines(keepends=True)
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if node:
start = cast(int, getattr(node, "lineno")) - 1
end = cast(int, getattr(node, "end_lineno"))
return "".join(lines[start:end])
return f"ERROR: definition '{name}' not found in {path}"
except Exception as e:
return f"ERROR retrieving definition '{name}' from '{path}': {e}"
def py_update_definition(path: str, name: str, new_content: str) -> str:
"""Surgically replace the definition of a class or function."""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node:
return f"ERROR: could not find definition '{name}' in {path}"
start = cast(int, getattr(node, "lineno"))
end = cast(int, getattr(node, "end_lineno"))
return set_file_slice(path, start, end, new_content)
except Exception as e:
return f"ERROR updating definition '{name}' in '{path}': {e}"
def py_get_signature(path: str, name: str) -> str:
"""Returns only the signature part of a function or method (def line until colon)."""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines(keepends=True)
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
return f"ERROR: could not find function/method '{name}' in {path}"
start = node.lineno - 1
body_start = node.body[0].lineno - 1
sig_lines = lines[start:body_start]
sig = "".join(sig_lines).strip()
if sig.endswith(":"):
return sig
# If body is on the same line (e.g. def foo(): pass), we need to split by colon
full_line = lines[start]
colon_idx = full_line.find(":")
if colon_idx != -1:
return full_line[:colon_idx+1].strip()
return sig
except Exception as e:
return f"ERROR retrieving signature '{name}' from '{path}': {e}"
def py_set_signature(path: str, name: str, new_signature: str) -> str:
"""Surgically replace only the signature of a function/method."""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
code.splitlines(keepends=True)
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
return f"ERROR: could not find function/method '{name}' in {path}"
start = node.lineno
body_start_line = node.body[0].lineno
# We replace from start until body_start_line - 1
# But we must be careful about comments/docstrings between sig and body
# For now, we replace only the lines that contain the signature
end = body_start_line - 1
return set_file_slice(path, start, end, new_signature)
except Exception as e:
return f"ERROR updating signature '{name}' in '{path}': {e}"
def py_get_class_summary(path: str, name: str) -> str:
"""Returns a summary of a class: its methods and their signatures."""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.exists():
return f"ERROR: file not found: {path}"
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, ast.ClassDef):
return f"ERROR: could not find class '{name}' in {path}"
lines = code.splitlines(keepends=True)
summary = [f"Class: {name}"]
doc = ast.get_docstring(node)
if doc:
summary.append(f" Docstring: {doc}")
for body_node in node.body:
if isinstance(body_node, (ast.FunctionDef, ast.AsyncFunctionDef)):
start = body_node.lineno - 1
body_start = body_node.body[0].lineno - 1
sig = "".join(lines[start:body_start]).strip()
summary.append(f" - {sig}")
return "\n".join(summary)
except Exception as e:
return f"ERROR summarizing class '{name}' in '{path}': {e}"
def py_get_var_declaration(path: str, name: str) -> str:
"""Get the assignment/declaration line(s) for a module-level or class-level variable."""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.is_file() or p.suffix != ".py":
return f"ERROR: not a python file: {path}"
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines(keepends=True)
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, (ast.Assign, ast.AnnAssign)):
return f"ERROR: could not find variable '{name}' in {path}"
start = cast(int, getattr(node, "lineno")) - 1
end = cast(int, getattr(node, "end_lineno"))
return "".join(lines[start:end])
except Exception as e:
return f"ERROR retrieving variable '{name}' from '{path}': {e}"
def py_set_var_declaration(path: str, name: str, new_declaration: str) -> str:
"""Surgically replace a variable assignment/declaration."""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
if not p.is_file() or p.suffix != ".py":
return f"ERROR: not a python file: {path}"
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, (ast.Assign, ast.AnnAssign)):
return f"ERROR: could not find variable '{name}' in {path}"
start = cast(int, getattr(node, "lineno"))
end = cast(int, getattr(node, "end_lineno"))
return set_file_slice(path, start, end, new_declaration)
except Exception as e:
return f"ERROR updating variable '{name}' in '{path}': {e}"
def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str:
"""
Returns the git diff for a file or directory.
base_rev: The base revision (default: HEAD)
head_rev: The head revision (optional)
"""
p, err = _resolve_and_check(path)
if err:
return err
assert p is not None
cmd = ["git", "diff", base_rev]
if head_rev:
cmd.append(head_rev)
cmd.extend(["--", str(p)])
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding="utf-8")
return result.stdout if result.stdout else "(no changes)"
except subprocess.CalledProcessError as e:
return f"ERROR running git diff: {e.stderr}"
except Exception as e:
return f"ERROR: {e}"
def py_find_usages(path: str, name: str) -> str:
"""Finds exact string matches of a symbol in a given file or directory."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
try:
import re
pattern = re.compile(r"\b" + re.escape(name) + r"\b")
results = []
def _search_file(fp: Path) -> None:
if fp.name == "history.toml" or fp.name.endswith("_history.toml"): return
if not _is_allowed(fp): return
try:
text = fp.read_text(encoding="utf-8")
lines = text.splitlines()
for i, line in enumerate(lines, 1):
if pattern.search(line):
rel = fp.relative_to(_primary_base_dir if _primary_base_dir else Path.cwd())
results.append(f"{rel}:{i}: {line.strip()[:100]}")
except Exception:
pass
if p.is_file():
_search_file(p)
else:
for root, dirs, files in os.walk(p):
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')]
for file in files:
if file.endswith(('.py', '.md', '.toml', '.txt', '.json')):
_search_file(Path(root) / file)
if not results:
return f"No usages found for '{name}' in {p}"
if len(results) > 100:
return "\n".join(results[:100]) + f"\n... (and {len(results)-100} more)"
return "\n".join(results)
except Exception as e:
return f"ERROR finding usages for '{name}': {e}"
def py_get_imports(path: str) -> str:
"""Parses a file's AST and returns a strict list of its dependencies."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}"
try:
code = p.read_text(encoding="utf-8")
tree = ast.parse(code)
imports = []
for node in tree.body:
if isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
for alias in node.names:
imports.append(f"{module}.{alias.name}" if module else alias.name)
if not imports: return "No imports found."
return "Imports:\n" + "\n".join(f" - {i}" for i in imports)
except Exception as e:
return f"ERROR getting imports for '{path}': {e}"
def py_check_syntax(path: str) -> str:
"""Runs a quick syntax check on a Python file."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}"
try:
code = p.read_text(encoding="utf-8")
ast.parse(code)
return f"Syntax OK: {path}"
except SyntaxError as e:
return f"SyntaxError in {path} at line {e.lineno}, offset {e.offset}: {e.msg}\n{e.text}"
except Exception as e:
return f"ERROR checking syntax for '{path}': {e}"
def py_get_hierarchy(path: str, class_name: str) -> str:
"""Scans the project to find subclasses of a given class."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
subclasses: list[str] = []
def _search_file(fp: Path) -> None:
if not _is_allowed(fp): return
try:
code = fp.read_text(encoding="utf-8")
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
for base in node.bases:
if isinstance(base, ast.Name) and base.id == class_name:
subclasses.append(f"{fp.name}: class {node.name}({class_name})")
elif isinstance(base, ast.Attribute) and base.attr == class_name:
if isinstance(base.value, ast.Name):
subclasses.append(f"{fp.name}: class {node.name}({base.value.id}.{class_name})")
except Exception:
pass
try:
if p.is_file():
_search_file(p)
else:
for root, dirs, files in os.walk(p):
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')]
for file in files:
if file.endswith('.py'):
_search_file(Path(root) / file)
if not subclasses:
return f"No subclasses of '{class_name}' found in {p}"
return f"Subclasses of '{class_name}':\n" + "\n".join(f" - {s}" for s in subclasses)
except Exception as e:
return f"ERROR finding subclasses of '{class_name}': {e}"
def py_get_docstring(path: str, name: str) -> str:
"""Extracts the docstring for a specific module, class, or function."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}"
try:
code = p.read_text(encoding="utf-8")
tree = ast.parse(code)
if not name or name == "module":
doc = ast.get_docstring(tree)
return doc if doc else "No module docstring found."
node = _get_symbol_node(tree, name)
if not node: return f"ERROR: could not find symbol '{name}' in {path}"
if isinstance(node, (ast.AsyncFunctionDef, ast.FunctionDef, ast.ClassDef, ast.Module)):
doc = ast.get_docstring(node)
return doc if doc else f"No docstring found for '{name}'."
return f"No docstring found for '{name}'."
except Exception as e:
return f"ERROR getting docstring for '{name}': {e}"
def get_tree(path: str, max_depth: int = 2) -> str:
"""Returns a directory structure up to a max depth."""
p, err = _resolve_and_check(path)
if err: return err
assert p is not None
if not p.is_dir(): return f"ERROR: not a directory: {path}"
try:
m_depth = max_depth
def _build_tree(dir_path: Path, current_depth: int, prefix: str = "") -> list[str]:
if current_depth > m_depth: return []
lines = []
try:
entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
except PermissionError:
return []
# Filter
entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")]
for i, entry in enumerate(entries):
is_last = (i == len(entries) - 1)
connector = "└── " if is_last else "├── "
lines.append(f"{prefix}{connector}{entry.name}")
if entry.is_dir():
extension = " " if is_last else ""
lines.extend(_build_tree(entry, current_depth + 1, prefix + extension))
return lines
tree_lines = [f"{p.name}/"] + _build_tree(p, 1)
return "\n".join(tree_lines)
except Exception as e:
return f"ERROR generating tree for '{path}': {e}"
# ------------------------------------------------------------------ web tools
class _DDGParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.results: list[dict[str, str]] = []
self.in_result: bool = False
self.in_title: bool = False
self.in_snippet: bool = False
self.current_link: str = ""
self.current_title: str = ""
self.current_snippet: str = ""
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
attrs_dict = dict(attrs)
if tag == "a" and "result__url" in cast(str, attrs_dict.get("class", "")):
self.current_link = cast(str, attrs_dict.get("href", ""))
if tag == "a" and "result__snippet" in cast(str, attrs_dict.get("class", "")):
self.in_snippet = True
if tag == "h2" and "result__title" in cast(str, attrs_dict.get("class", "")):
self.in_title = True
def handle_endtag(self, tag: str) -> None:
if tag == "a" and self.in_snippet:
self.in_snippet = False
if tag == "h2" and self.in_title:
self.in_title = False
if self.current_link:
self.results.append({
"title": self.current_title.strip(),
"link": self.current_link,
"snippet": self.current_snippet.strip()
})
self.current_title = ""
self.current_snippet = ""
self.current_link = ""
def handle_data(self, data: str) -> None:
if self.in_title:
self.current_title += data
if self.in_snippet:
self.current_snippet += data
class _TextExtractor(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.text: list[str] = []
self.hide: int = 0
self.ignore_tags: set[str] = {'script', 'style', 'head', 'meta', 'nav', 'header', 'footer', 'noscript', 'svg'}
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag in self.ignore_tags:
self.hide += 1
def handle_endtag(self, tag: str) -> None:
if tag in self.ignore_tags:
self.hide -= 1
def handle_data(self, data: str) -> None:
if self.hide == 0:
cleaned = data.strip()
if cleaned:
self.text.append(cleaned)
def web_search(query: str) -> str:
"""Search the web using DuckDuckGo HTML and return top results."""
url = "https://html.duckduckgo.com/html/?q=" + urllib.parse.quote(query)
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
html = resp.read().decode('utf-8', errors='ignore')
parser = _DDGParser()
parser.feed(html)
if not parser.results:
return f"No results found for '{query}'"
lines = [f"Search Results for '{query}':"]
for i, r in enumerate(parser.results[:5], 1):
lines.append(f"{i}. {r['title']}\nURL: {r['link']}\nSnippet: {r['snippet']}\n")
return "\n".join(lines)
except Exception as e:
return f"ERROR searching web for '{query}': {e}"
def fetch_url(url: str) -> str:
"""Fetch a URL and return its text content (stripped of HTML tags)."""
# Correct duckduckgo redirect links if passed
if url.startswith("//duckduckgo.com/l/?uddg="):
split_uddg = url.split("uddg=")
if len(split_uddg) > 1:
url = urllib.parse.unquote(split_uddg[1].split("&")[0])
if not url.startswith("http"):
url = "https://" + url
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
html = resp.read().decode('utf-8', errors='ignore')
parser = _TextExtractor()
parser.feed(html)
full_text = " ".join(parser.text)
full_text = _re.sub(r'\s+', ' ', full_text)
if not full_text.strip():
return f"FETCH OK: No readable text extracted from {url}. The page might be empty, JavaScript-heavy, or blocked."
# Limit to 40k chars to prevent context blowup
if len(full_text) > 40000:
return full_text[:40000] + "\n... (content truncated)"
return full_text
except Exception as e:
return f"ERROR fetching URL '{url}': {e}"
def get_ui_performance() -> str:
"""Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag)."""
if perf_monitor_callback is None:
return "INFO: UI Performance monitor is not available (headless/CLI mode). This tool is only functional when the Manual Slop GUI is running."
try:
metrics = perf_monitor_callback()
# Clean up the dict string for the AI
metric_str = str(metrics)
for char in "{}'":
metric_str = metric_str.replace(char, "")
return f"UI Performance Snapshot:\n{metric_str}"
except Exception as e:
return f"ERROR: Failed to retrieve UI performance: {str(e)}"
# ------------------------------------------------------------------ tool dispatch
class StdioMCPServer:
def __init__(self, config: models.MCPServerConfig):
self.config = config
self.name = config.name
self.proc = None
self.tools = {}
self._id_counter = 0
self._pending_requests = {}
self.status = 'idle'
def _get_id(self):
self._id_counter += 1
return self._id_counter
async def start(self):
self.status = 'starting'
self.proc = await asyncio.create_subprocess_exec(
self.config.command,
*self.config.args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
asyncio.create_task(self._read_stderr())
await self.list_tools()
self.status = 'running'
async def stop(self):
if self.proc:
try:
if self.proc.stdin:
self.proc.stdin.close()
await self.proc.stdin.wait_closed()
except Exception:
pass
try:
self.proc.terminate()
await self.proc.wait()
except Exception:
pass
self.proc = None
self.status = 'idle'
async def _read_stderr(self):
while self.proc and not self.proc.stdout.at_eof():
line = await self.proc.stderr.readline()
if line:
print(f'[MCP:{self.name}:err] {line.decode().strip()}')
async def _send_request(self, method: str, params: dict = None):
req_id = self._get_id()
request = {
'jsonrpc': '2.0',
'id': req_id,
'method': method,
'params': params or {}
}
self.proc.stdin.write(json.dumps(request).encode() + b'\n')
await self.proc.stdin.drain()
# Simplistic wait for response - in real use, we'd need a read loop
# For now, we'll read one line and hope it's ours (fragile, but for MVP)
line = await self.proc.stdout.readline()
if line:
resp = json.loads(line.decode())
return resp.get('result')
return None
async def list_tools(self):
result = await self._send_request('tools/list')
if result and 'tools' in result:
for t in result['tools']:
self.tools[t['name']] = t
return self.tools
async def call_tool(self, name: str, arguments: dict):
result = await self._send_request('tools/call', {'name': name, 'arguments': arguments})
if result and 'content' in result:
return '\n'.join([c.get('text', '') for c in result['content'] if c.get('type') == 'text'])
return str(result)
class ExternalMCPManager:
def __init__(self):
self.servers = {}
async def add_server(self, config: models.MCPServerConfig):
if config.url:
# RemoteMCPServer placeholder
return
server = StdioMCPServer(config)
await server.start()
self.servers[config.name] = server
async def stop_all(self):
for server in self.servers.values():
await server.stop()
self.servers = {}
def get_all_tools(self) -> dict:
all_tools = {}
for sname, server in self.servers.items():
for tname, tool in server.tools.items():
all_tools[tname] = {**tool, 'server': sname, 'server_status': server.status}
return all_tools
def get_servers_status(self) -> dict[str, str]:
return {name: server.status for name, server in self.servers.items()}
async def async_dispatch(self, tool_name: str, tool_input: dict) -> str:
for server in self.servers.values():
if tool_name in server.tools:
return await server.call_tool(tool_name, tool_input)
return f'Error: External tool {tool_name} not found.'
_external_mcp_manager = ExternalMCPManager()
def get_external_mcp_manager() -> ExternalMCPManager:
global _external_mcp_manager
return _external_mcp_manager
TOOL_NAMES: set[str] = {"read_file", "list_directory", "search_files", "get_file_summary", "py_get_skeleton", "py_get_code_outline", "py_get_definition", "get_git_diff", "web_search", "fetch_url", "get_ui_performance", "get_file_slice", "set_file_slice", "edit_file", "py_update_definition", "py_get_signature", "py_set_signature", "py_get_class_summary", "py_get_var_declaration", "py_set_var_declaration", "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"}
def dispatch(tool_name: str, tool_input: dict[str, Any]) -> str:
"""
Dispatch an MCP tool call by name. Returns the result as a string.
"""
# Handle aliases
path = str(tool_input.get("path", tool_input.get("file_path", tool_input.get("dir_path", ""))))
if tool_name == "read_file":
return read_file(path)
if tool_name == "list_directory":
return list_directory(path)
if tool_name == "search_files":
return search_files(path, str(tool_input.get("pattern", "*")))
if tool_name == "get_file_summary":
return get_file_summary(path)
if tool_name == "py_get_skeleton":
return py_get_skeleton(path)
if tool_name == "py_get_code_outline":
return py_get_code_outline(path)
if tool_name == "py_get_definition":
return py_get_definition(path, str(tool_input.get("name", "")))
if tool_name == "py_update_definition":
return py_update_definition(path, str(tool_input.get("name", "")), str(tool_input.get("new_content", "")))
if tool_name == "py_get_signature":
return py_get_signature(path, str(tool_input.get("name", "")))
if tool_name == "py_set_signature":
return py_set_signature(path, str(tool_input.get("name", "")), str(tool_input.get("new_signature", "")))
if tool_name == "py_get_class_summary":
return py_get_class_summary(path, str(tool_input.get("name", "")))
if tool_name == "py_get_var_declaration":
return py_get_var_declaration(path, str(tool_input.get("name", "")))
if tool_name == "py_set_var_declaration":
return py_set_var_declaration(path, str(tool_input.get("name", "")), str(tool_input.get("new_declaration", "")))
if tool_name == "get_file_slice":
return get_file_slice(path, int(tool_input.get("start_line", 1)), int(tool_input.get("end_line", 1)))
if tool_name == "set_file_slice":
return set_file_slice(path, int(tool_input.get("start_line", 1)), int(tool_input.get("end_line", 1)), str(tool_input.get("new_content", "")))
if tool_name == "get_git_diff":
return get_git_diff(
path,
str(tool_input.get("base_rev", "HEAD")),
str(tool_input.get("head_rev", ""))
)
if tool_name == "edit_file":
return edit_file(
path,
str(tool_input.get("old_string", "")),
str(tool_input.get("new_string", "")),
bool(tool_input.get("replace_all", False))
)
if tool_name == "web_search":
return web_search(str(tool_input.get("query", "")))
if tool_name == "fetch_url":
return fetch_url(str(tool_input.get("url", "")))
if tool_name == "get_ui_performance":
return get_ui_performance()
if tool_name == "py_find_usages":
return py_find_usages(path, str(tool_input.get("name", "")))
if tool_name == "py_get_imports":
return py_get_imports(path)
if tool_name == "py_check_syntax":
return py_check_syntax(path)
if tool_name == "py_get_hierarchy":
return py_get_hierarchy(path, str(tool_input.get("class_name", "")))
if tool_name == "py_get_docstring":
return py_get_docstring(path, str(tool_input.get("name", "")))
if tool_name == "get_tree":
return get_tree(path, int(tool_input.get("max_depth", 2)))
return f"ERROR: unknown MCP tool '{tool_name}'"
async def async_dispatch(tool_name: str, tool_input: dict[str, Any]) -> str:
# Check native tools
native_names = {t['name'] for t in MCP_TOOL_SPECS}
if tool_name in native_names:
return await asyncio.to_thread(dispatch, tool_name, tool_input)
# Check external tools
if tool_name in get_external_mcp_manager().get_all_tools():
return await get_external_mcp_manager().async_dispatch(tool_name, tool_input)
return f'ERROR: unknown MCP tool {tool_name}'
def get_tool_schemas() -> list[dict[str, Any]]:
res = list(MCP_TOOL_SPECS)
manager = get_external_mcp_manager()
for tname, tinfo in manager.get_all_tools().items():
res.append({
'name': tname,
'description': tinfo.get('description', ''),
'parameters': tinfo.get('inputSchema', {'type': 'object', 'properties': {}})
})
return res
# ------------------------------------------------------------------ tool schema helpers
# These are imported by ai_client.py to build provider-specific declarations.
MCP_TOOL_SPECS: list[dict[str, Any]] = [
{
"name": "read_file",
"description": (
"Read the full UTF-8 content of a file within the allowed project paths. "
"Use get_file_summary first to decide whether you need the full content."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file to read.",
}
},
"required": ["path"],
},
},
{
"name": "list_directory",
"description": (
"List files and subdirectories within an allowed directory. "
"Shows name, type (file/dir), and size. Use this to explore the project structure."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the directory to list.",
}
},
"required": ["path"],
},
},
{
"name": "search_files",
"description": (
"Search for files matching a glob pattern within an allowed directory. "
"Supports recursive patterns like '**/*.py'. "
"Use this to find files by extension or name pattern."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the directory to search within.",
},
"pattern": {
"type": "string",
"description": "Glob pattern, e.g. '*.py', '**/*.toml', 'src/**/*.rs'.",
},
},
"required": ["path", "pattern"],
},
},
{
"name": "get_file_summary",
"description": (
"Get a compact heuristic summary of a file without reading its full content. "
"For Python: imports, classes, methods, functions, constants. "
"For TOML: table keys. For Markdown: headings. Others: line count + preview. "
"Use this before read_file to decide if you need the full content."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file to summarise.",
}
},
"required": ["path"],
},
},
{
"name": "py_get_skeleton",
"description": (
"Get a skeleton view of a Python file. "
"This returns all classes and function signatures with their docstrings, "
"but replaces function bodies with '...'. "
"Use this to understand module interfaces without reading the full implementation."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file.",
}
},
"required": ["path"],
},
},
{
"name": "py_get_code_outline",
"description": (
"Get a hierarchical outline of a code file. "
"This returns classes, functions, and methods with their line ranges and brief docstrings. "
"Use this to quickly map out a file's structure before reading specific sections."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the code file (currently supports .py).",
}
},
"required": ["path"],
},
},
{
"name": "get_file_slice",
"description": "Read a specific line range from a file. Useful for reading parts of very large files.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file."
},
"start_line": {
"type": "integer",
"description": "1-based start line number."
},
"end_line": {
"type": "integer",
"description": "1-based end line number (inclusive)."
}
},
"required": ["path", "start_line", "end_line"]
}
},
{
"name": "set_file_slice",
"description": "Replace a specific line range in a file with new content. Surgical edit tool.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file."
},
"start_line": {
"type": "integer",
"description": "1-based start line number."
},
"end_line": {
"type": "integer",
"description": "1-based end line number (inclusive)."
},
"new_content": {
"type": "string",
"description": "New content to insert."
}
},
"required": ["path", "start_line", "end_line", "new_content"]
}
},
{
"name": "edit_file",
"description": "Replace exact string match in a file. Preserves indentation and line endings. Drop-in replacement for native edit tool.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file."
},
"old_string": {
"type": "string",
"description": "The text to replace."
},
"new_string": {
"type": "string",
"description": "The replacement text."
},
"replace_all": {
"type": "boolean",
"description": "Replace all occurrences. Default false."
}
},
"required": ["path", "old_string", "new_string"]
}
},
{
"name": "py_get_definition",
"description": (
"Get the full source code of a specific class, function, or method definition. "
"This is more efficient than reading the whole file if you know what you're looking for."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file.",
},
"name": {
"type": "string",
"description": "The name of the class or function to retrieve. Use 'ClassName.method_name' for methods.",
}
},
"required": ["path", "name"],
},
},
{
"name": "py_update_definition",
"description": "Surgically replace the definition of a class or function in a Python file using AST to find line ranges.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file."
},
"name": {
"type": "string",
"description": "Name of class/function/method."
},
"new_content": {
"type": "string",
"description": "Complete new source for the definition."
}
},
"required": ["path", "name", "new_content"]
}
},
{
"name": "py_get_signature",
"description": "Get only the signature part of a Python function or method (from def until colon).",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file."
},
"name": {
"type": "string",
"description": "Name of the function/method (e.g. 'ClassName.method_name')."
}
},
"required": ["path", "name"]
}
},
{
"name": "py_set_signature",
"description": "Surgically replace only the signature of a Python function or method.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file."
},
"name": {
"type": "string",
"description": "Name of the function/method."
},
"new_signature": {
"type": "string",
"description": "Complete new signature string (including def and trailing colon)."
}
},
"required": ["path", "name", "new_signature"]
}
},
{
"name": "py_get_class_summary",
"description": "Get a summary of a Python class, listing its docstring and all method signatures.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file."
},
"name": {
"type": "string",
"description": "Name of the class."
}
},
"required": ["path", "name"]
}
},
{
"name": "py_get_var_declaration",
"description": "Get the assignment/declaration line for a variable.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file."
},
"name": {
"type": "string",
"description": "Name of the variable."
}
},
"required": ["path", "name"]
}
},
{
"name": "py_set_var_declaration",
"description": "Surgically replace a variable assignment/declaration.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the .py file."
},
"name": {
"type": "string",
"description": "Name of the variable."
},
"new_declaration": {
"type": "string",
"description": "Complete new assignment/declaration string."
}
},
"required": ["path", "name", "new_declaration"]
}
},
{
"name": "get_git_diff",
"description": (
"Returns the git diff for a file or directory. "
"Use this to review changes efficiently without reading entire files."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file or directory.",
},
"base_rev": {
"type": "string",
"description": "Base revision (e.g. 'HEAD', 'HEAD~1', or a commit hash). Defaults to 'HEAD'.",
},
"head_rev": {
"type": "string",
"description": "Head revision (optional).",
}
},
"required": ["path"],
},
},
{
"name": "web_search",
"description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
},
{
"name": "fetch_url",
"description": "Fetch the full text content of a URL (stripped of HTML tags). Use this after web_search to read relevant information from the web.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The full URL to fetch."
}
},
"required": ["url"]
}
},
{
"name": "get_ui_performance",
"description": "Get a snapshot of the current UI performance metrics, including FPS, Frame Time (ms), CPU usage (%), and Input Lag (ms). Use this to diagnose UI slowness or verify that your changes haven't degraded the user experience.",
"parameters": {
"type": "object",
"properties": {}
}
},
{
"name": "py_find_usages",
"description": "Finds exact string matches of a symbol in a given file or directory.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to file or directory to search." },
"name": { "type": "string", "description": "The symbol/string to search for." }
},
"required": ["path", "name"]
}
},
{
"name": "py_get_imports",
"description": "Parses a file's AST and returns a strict list of its dependencies.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to the .py file." }
},
"required": ["path"]
}
},
{
"name": "py_check_syntax",
"description": "Runs a quick syntax check on a Python file.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to the .py file." }
},
"required": ["path"]
}
},
{
"name": "py_get_hierarchy",
"description": "Scans the project to find subclasses of a given class.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Directory path to search in." },
"class_name": { "type": "string", "description": "Name of the base class." }
},
"required": ["path", "class_name"]
}
},
{
"name": "py_get_docstring",
"description": "Extracts the docstring for a specific module, class, or function.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to the .py file." },
"name": { "type": "string", "description": "Name of symbol or 'module' for the file docstring." }
},
"required": ["path", "name"]
}
},
{
"name": "get_tree",
"description": "Returns a directory structure up to a max depth.",
"parameters": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Directory path." },
"max_depth": { "type": "integer", "description": "Maximum depth to recurse (default 2)." }
},
"required": ["path"]
}
}
]