# mcp_client.py """ MCP Client - Multi-tool filesystem and network operations with sandboxing. This module implements a Model Context Protocol (MCP)-like interface for AI agents to interact with the filesystem and network. It provides 26 tools with a three-layer security model to prevent unauthorized access. Three-Layer Security Model: 1. Allowlist Construction (configure()): - Builds _allowed_paths from project file_items - Populates _base_dirs from file parents and extra_base_dirs - Sets _primary_base_dir for relative path resolution 2. Path Validation (_is_allowed()): - Blacklist check: history.toml, *_history.toml, config, credentials - Explicit allowlist check: _allowed_paths membership - CWD fallback: allows cwd() subpaths if no base_dirs configured - Base directory containment: must be subpath of _base_dirs 3. Resolution Gate (_resolve_and_check()): - Converts relative paths using _primary_base_dir - Resolves symlinks to prevent traversal attacks - Returns (resolved_path, error_message) tuple Tool Categories: - File I/O: read_file, list_directory, search_files, get_tree - Surgical Edits: set_file_slice, edit_file - AST-Based (Python): py_get_skeleton, py_get_code_outline, py_get_definition, py_update_definition, py_get_signature, py_set_signature, py_get_class_summary, py_get_var_declaration, py_set_var_declaration - Analysis: get_file_summary, get_git_diff, py_find_usages, py_get_imports, py_check_syntax, py_get_hierarchy, py_get_docstring - Network: web_search, fetch_url - Runtime: get_ui_performance Mutating Tools: The MUTATING_TOOLS frozenset defines tools that modify files. ai_client.py checks this set and routes to pre_tool_callback (GUI approval) if present. Thread Safety: This module uses module-level global state (_allowed_paths, _base_dirs). Call configure() before dispatch() in multi-threaded environments. See Also: - docs/guide_tools.md for complete tool inventory and security model - src/ai_client.py for tool dispatch integration - src/shell_runner.py for PowerShell execution """ # mcp_client.py #MCP-style file context tools for manual_slop. from __future__ import annotations import asyncio from pathlib import Path from typing import Optional, Callable, Any, cast import os import ast import subprocess from src import summarize from src import outline_tool import urllib.request import urllib.parse from html.parser import HTMLParser import re as _re # ------------------------------------------------------------------ mutating tools sentinel # Tools that write or modify files. ai_client checks this set before dispatch # and routes to pre_tool_callback (GUI approval) if the tool name is present. MUTATING_TOOLS: frozenset[str] = frozenset({ "set_file_slice", "py_update_definition", "py_set_signature", "py_set_var_declaration", "edit_file", }) # ------------------------------------------------------------------ state # Set by configure() before the AI send loop starts. # allowed_paths : set of resolved absolute Path objects (files or dirs) # base_dirs : set of resolved absolute Path dirs that act as roots _allowed_paths: set[Path] = set() _base_dirs: set[Path] = set() _primary_base_dir: Path | None = None # Injected by gui_2.py - returns a dict of performance metrics perf_monitor_callback: Optional[Callable[[], dict[str, Any]]] = None def configure(file_items: list[dict[str, Any]], extra_base_dirs: list[str] | None = None) -> None: """ Build the allowlist from aggregate file_items. Called by ai_client before each send so the list reflects the current project. file_items : list of dicts from aggregate.build_file_items() extra_base_dirs : additional directory roots to allow traversal of """ global _allowed_paths, _base_dirs, _primary_base_dir _allowed_paths = set() _base_dirs = set() _primary_base_dir = Path(extra_base_dirs[0]).resolve() if extra_base_dirs else Path.cwd() for item in file_items: p = item.get("path") if p is not None: try: rp = Path(p).resolve(strict=True) except (OSError, ValueError): rp = Path(p).resolve() _allowed_paths.add(rp) _base_dirs.add(rp.parent) if extra_base_dirs: for d in extra_base_dirs: dp = Path(d).resolve() if dp.is_dir(): _base_dirs.add(dp) def _is_allowed(path: Path) -> bool: """ Return True if `path` is within the allowlist. A path is allowed if: - it is explicitly in _allowed_paths, OR - it is contained within (or equal to) one of the _base_dirs All paths are resolved (follows symlinks) before comparison to prevent symlink-based path traversal. CRITICAL: Blacklisted files (history) are NEVER allowed. """ from src.paths import get_config_path from src.ai_client import get_credentials_path try: rp = path.resolve(strict=True) except (OSError, ValueError): rp = path.resolve() # Blacklist check by resolved path if rp == get_config_path().resolve(): return False if rp == get_credentials_path().resolve(): return False name = path.name.lower() if name == "history.toml" or name.endswith("_history.toml"): return False if rp in _allowed_paths: return True # Allow current working directory and subpaths by default if no base_dirs cwd = Path.cwd().resolve() if not _base_dirs: try: rp.relative_to(cwd) return True except ValueError: pass for bd in _base_dirs: try: rp.relative_to(bd) return True except ValueError: continue return False def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]: """ Resolve raw_path and verify it passes the allowlist check. Returns (resolved_path, error_string). error_string is empty on success. """ try: p = Path(raw_path) if not p.is_absolute() and _primary_base_dir: p = _primary_base_dir / p p = p.resolve() except Exception as e: return None, f"ERROR: invalid path '{raw_path}': {e}" if not _is_allowed(p): allowed_bases = "\\n".join([f" - {d}" for d in _base_dirs]) return None, ( f"ACCESS DENIED: '{raw_path}' resolves to '{p}', which is not within the allowed paths.\\n" f"Allowed base directories are:\\n{allowed_bases}" ) return p, "" # ------------------------------------------------------------------ tool implementations def read_file(path: str) -> str: """Return the UTF-8 content of a file, or an error string.""" p, err = _resolve_and_check(path) if err or p is None: return err if not p.exists(): return f"ERROR: file not found: {path}" if not p.is_file(): return f"ERROR: not a file: {path}" try: return p.read_text(encoding="utf-8") except Exception as e: return f"ERROR reading '{path}': {e}" def list_directory(path: str) -> str: """List entries in a directory. Returns a compact text table.""" p, err = _resolve_and_check(path) if err or p is None: return err if not p.exists(): return f"ERROR: path not found: {path}" if not p.is_dir(): return f"ERROR: not a directory: {path}" try: entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) lines = [f"Directory: {p}", ""] count = 0 for entry in entries: # Blacklist check name = entry.name.lower() if name == "history.toml" or name.endswith("_history.toml"): continue kind = "file" if entry.is_file() else "dir " size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else "" lines.append(f" [{kind}] {entry.name:<40} {size}") count += 1 lines.append(f" ({count} entries)") return "\n".join(lines) except Exception as e: return f"ERROR listing '{path}': {e}" def search_files(path: str, pattern: str) -> str: """ Search for files matching a glob pattern within path. pattern examples: '*.py', '**/*.toml', 'src/**/*.rs' """ p, err = _resolve_and_check(path) if err or p is None: return err if not p.is_dir(): return f"ERROR: not a directory: {path}" try: matches = sorted(p.glob(pattern)) if not matches: return f"No files matched '{pattern}' in {path}" lines = [f"Search '{pattern}' in {p}:", ""] count = 0 for m in matches: # Blacklist check name = m.name.lower() if name == "history.toml" or name.endswith("_history.toml"): continue rel = m.relative_to(p) kind = "file" if m.is_file() else "dir " lines.append(f" [{kind}] {rel}") count += 1 lines.append(f" ({count} match(es))") return "\n".join(lines) except Exception as e: return f"ERROR searching '{path}': {e}" def get_file_summary(path: str) -> str: """ Return the heuristic summary for a file (same as the initial context block). For .py files: imports, classes, methods, functions, constants. For .toml: table keys. For .md: headings. Others: line count + preview. """ p, err = _resolve_and_check(path) if err or p is None: return err if not p.exists(): return f"ERROR: file not found: {path}" if not p.is_file(): return f"ERROR: not a file: {path}" try: content = p.read_text(encoding="utf-8") return summarize.summarise_file(p, content) except Exception as e: return f"ERROR summarising '{path}': {e}" def py_get_skeleton(path: str) -> str: """ Returns a skeleton of a Python file (preserving docstrings, stripping function bodies). """ p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}" try: from src.file_cache import ASTParser code = p.read_text(encoding="utf-8") parser = ASTParser("python") return parser.get_skeleton(code) except Exception as e: return f"ERROR generating skeleton for '{path}': {e}" def py_get_code_outline(path: str) -> str: """ Returns a hierarchical outline of a code file (classes, functions, methods with line ranges). """ p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" if not p.is_file(): return f"ERROR: not a file: {path}" try: code = p.read_text(encoding="utf-8") return outline_tool.get_outline(p, code) except Exception as e: return f"ERROR generating outline for '{path}': {e}" def get_file_slice(path: str, start_line: int, end_line: int) -> str: """Return a specific line range from a file.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" try: lines = p.read_text(encoding="utf-8").splitlines(keepends=True) start_idx = start_line - 1 end_idx = end_line return "".join(lines[start_idx:end_idx]) except Exception as e: return f"ERROR reading slice from '{path}': {e}" def set_file_slice(path: str, start_line: int, end_line: int, new_content: str) -> str: """Replace a specific line range in a file with new content.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" try: lines = p.read_text(encoding="utf-8").splitlines(keepends=True) start_idx = start_line - 1 end_idx = end_line if new_content and not new_content.endswith("\n"): new_content += "\n" new_lines = new_content.splitlines(keepends=True) if new_content else [] lines[start_idx:end_idx] = new_lines p.write_text("".join(lines), encoding="utf-8") return f"Successfully updated lines {start_line}-{end_line} in {path}" except Exception as e: return f"ERROR updating slice in '{path}': {e}" def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str: """ Replace exact string match in a file. Preserves indentation and line endings. Drop-in replacement for native edit tool that destroys 1-space indentation. """ p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" if not old_string: return "ERROR: old_string cannot be empty" try: content = p.read_text(encoding="utf-8") if old_string not in content: return f"ERROR: old_string not found in '{path}'" count = content.count(old_string) if count > 1 and not replace_all: return f"ERROR: Found {count} matches for old_string in '{path}'. Use replace_all=true or provide more context to make it unique." if replace_all: new_content = content.replace(old_string, new_string) p.write_text(new_content, encoding="utf-8") return f"Successfully replaced {count} occurrences in '{path}'" else: new_content = content.replace(old_string, new_string, 1) p.write_text(new_content, encoding="utf-8") return f"Successfully replaced 1 occurrence in '{path}'" except Exception as e: return f"ERROR editing '{path}': {e}" def _get_symbol_node(tree: ast.AST, name: str) -> Optional[ast.AST]: """Helper to find an AST node by name (Class, Function, or Variable). Supports dot notation.""" parts = name.split(".") def find_in_scope(scope_node: Any, target_name: str) -> Optional[ast.AST]: # scope_node could be Module, ClassDef, or FunctionDef body = getattr(scope_node, "body", []) for node in body: if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == target_name: return node if isinstance(node, ast.Assign): for t in node.targets: if isinstance(t, ast.Name) and t.id == target_name: return node if isinstance(node, ast.AnnAssign): if isinstance(node.target, ast.Name) and node.target.id == target_name: return node return None current = tree for part in parts: found = find_in_scope(current, part) if not found: return None current = found return current def py_get_symbol_info(path: str, name: str) -> tuple[str, int] | str: """ Returns (source_code, line_number) for a specific class, function, or method definition. If not found, returns an error string. """ p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" if not p.is_file(): return f"ERROR: not a file: {path}" try: code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF)) lines = code.splitlines(keepends=True) tree = ast.parse(code) node = _get_symbol_node(tree, name) if node: start = cast(int, getattr(node, "lineno")) end = cast(int, getattr(node, "end_lineno")) return ("".join(lines[start-1:end]), start) return f"ERROR: definition '{name}' not found in {path}" except Exception as e: return f"ERROR retrieving definition '{name}' from '{path}': {e}" def py_get_definition(path: str, name: str) -> str: """ Returns the source code for a specific class, function, or method definition. path: Path to the code file. name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method'). """ p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" if not p.is_file(): return f"ERROR: not a file: {path}" if p.suffix != ".py": return f"ERROR: py_get_definition currently only supports .py files (unsupported: {p.suffix})" try: code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF)) lines = code.splitlines(keepends=True) tree = ast.parse(code) node = _get_symbol_node(tree, name) if node: start = cast(int, getattr(node, "lineno")) - 1 end = cast(int, getattr(node, "end_lineno")) return "".join(lines[start:end]) return f"ERROR: definition '{name}' not found in {path}" except Exception as e: return f"ERROR retrieving definition '{name}' from '{path}': {e}" def py_update_definition(path: str, name: str, new_content: str) -> str: """Surgically replace the definition of a class or function.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" try: code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF)) tree = ast.parse(code) node = _get_symbol_node(tree, name) if not node: return f"ERROR: could not find definition '{name}' in {path}" start = cast(int, getattr(node, "lineno")) end = cast(int, getattr(node, "end_lineno")) return set_file_slice(path, start, end, new_content) except Exception as e: return f"ERROR updating definition '{name}' in '{path}': {e}" def py_get_signature(path: str, name: str) -> str: """Returns only the signature part of a function or method (def line until colon).""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" try: code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF)) lines = code.splitlines(keepends=True) tree = ast.parse(code) node = _get_symbol_node(tree, name) if not node or not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): return f"ERROR: could not find function/method '{name}' in {path}" start = node.lineno - 1 body_start = node.body[0].lineno - 1 sig_lines = lines[start:body_start] sig = "".join(sig_lines).strip() if sig.endswith(":"): return sig # If body is on the same line (e.g. def foo(): pass), we need to split by colon full_line = lines[start] colon_idx = full_line.find(":") if colon_idx != -1: return full_line[:colon_idx+1].strip() return sig except Exception as e: return f"ERROR retrieving signature '{name}' from '{path}': {e}" def py_set_signature(path: str, name: str, new_signature: str) -> str: """Surgically replace only the signature of a function/method.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" try: code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF)) code.splitlines(keepends=True) tree = ast.parse(code) node = _get_symbol_node(tree, name) if not node or not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): return f"ERROR: could not find function/method '{name}' in {path}" start = node.lineno body_start_line = node.body[0].lineno # We replace from start until body_start_line - 1 # But we must be careful about comments/docstrings between sig and body # For now, we replace only the lines that contain the signature end = body_start_line - 1 return set_file_slice(path, start, end, new_signature) except Exception as e: return f"ERROR updating signature '{name}' in '{path}': {e}" def py_get_class_summary(path: str, name: str) -> str: """Returns a summary of a class: its methods and their signatures.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.exists(): return f"ERROR: file not found: {path}" try: code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF)) tree = ast.parse(code) node = _get_symbol_node(tree, name) if not node or not isinstance(node, ast.ClassDef): return f"ERROR: could not find class '{name}' in {path}" lines = code.splitlines(keepends=True) summary = [f"Class: {name}"] doc = ast.get_docstring(node) if doc: summary.append(f" Docstring: {doc}") for body_node in node.body: if isinstance(body_node, (ast.FunctionDef, ast.AsyncFunctionDef)): start = body_node.lineno - 1 body_start = body_node.body[0].lineno - 1 sig = "".join(lines[start:body_start]).strip() summary.append(f" - {sig}") return "\n".join(summary) except Exception as e: return f"ERROR summarizing class '{name}' in '{path}': {e}" def py_get_var_declaration(path: str, name: str) -> str: """Get the assignment/declaration line(s) for a module-level or class-level variable.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}" try: code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF)) lines = code.splitlines(keepends=True) tree = ast.parse(code) node = _get_symbol_node(tree, name) if not node or not isinstance(node, (ast.Assign, ast.AnnAssign)): return f"ERROR: could not find variable '{name}' in {path}" start = cast(int, getattr(node, "lineno")) - 1 end = cast(int, getattr(node, "end_lineno")) return "".join(lines[start:end]) except Exception as e: return f"ERROR retrieving variable '{name}' from '{path}': {e}" def py_set_var_declaration(path: str, name: str, new_declaration: str) -> str: """Surgically replace a variable assignment/declaration.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}" try: code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF)) tree = ast.parse(code) node = _get_symbol_node(tree, name) if not node or not isinstance(node, (ast.Assign, ast.AnnAssign)): return f"ERROR: could not find variable '{name}' in {path}" start = cast(int, getattr(node, "lineno")) end = cast(int, getattr(node, "end_lineno")) return set_file_slice(path, start, end, new_declaration) except Exception as e: return f"ERROR updating variable '{name}' in '{path}': {e}" def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str: """ Returns the git diff for a file or directory. base_rev: The base revision (default: HEAD) head_rev: The head revision (optional) """ p, err = _resolve_and_check(path) if err: return err assert p is not None cmd = ["git", "diff", base_rev] if head_rev: cmd.append(head_rev) cmd.extend(["--", str(p)]) try: result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding="utf-8") return result.stdout if result.stdout else "(no changes)" except subprocess.CalledProcessError as e: return f"ERROR running git diff: {e.stderr}" except Exception as e: return f"ERROR: {e}" def py_find_usages(path: str, name: str) -> str: """Finds exact string matches of a symbol in a given file or directory.""" p, err = _resolve_and_check(path) if err: return err assert p is not None try: import re pattern = re.compile(r"\b" + re.escape(name) + r"\b") results = [] def _search_file(fp: Path) -> None: if fp.name == "history.toml" or fp.name.endswith("_history.toml"): return if not _is_allowed(fp): return try: text = fp.read_text(encoding="utf-8") lines = text.splitlines() for i, line in enumerate(lines, 1): if pattern.search(line): rel = fp.relative_to(_primary_base_dir if _primary_base_dir else Path.cwd()) results.append(f"{rel}:{i}: {line.strip()[:100]}") except Exception: pass if p.is_file(): _search_file(p) else: for root, dirs, files in os.walk(p): dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')] for file in files: if file.endswith(('.py', '.md', '.toml', '.txt', '.json')): _search_file(Path(root) / file) if not results: return f"No usages found for '{name}' in {p}" if len(results) > 100: return "\n".join(results[:100]) + f"\n... (and {len(results)-100} more)" return "\n".join(results) except Exception as e: return f"ERROR finding usages for '{name}': {e}" def py_get_imports(path: str) -> str: """Parses a file's AST and returns a strict list of its dependencies.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}" try: code = p.read_text(encoding="utf-8") tree = ast.parse(code) imports = [] for node in tree.body: if isinstance(node, ast.Import): for alias in node.names: imports.append(alias.name) elif isinstance(node, ast.ImportFrom): module = node.module or "" for alias in node.names: imports.append(f"{module}.{alias.name}" if module else alias.name) if not imports: return "No imports found." return "Imports:\n" + "\n".join(f" - {i}" for i in imports) except Exception as e: return f"ERROR getting imports for '{path}': {e}" def py_check_syntax(path: str) -> str: """Runs a quick syntax check on a Python file.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}" try: code = p.read_text(encoding="utf-8") ast.parse(code) return f"Syntax OK: {path}" except SyntaxError as e: return f"SyntaxError in {path} at line {e.lineno}, offset {e.offset}: {e.msg}\n{e.text}" except Exception as e: return f"ERROR checking syntax for '{path}': {e}" def py_get_hierarchy(path: str, class_name: str) -> str: """Scans the project to find subclasses of a given class.""" p, err = _resolve_and_check(path) if err: return err assert p is not None subclasses: list[str] = [] def _search_file(fp: Path) -> None: if not _is_allowed(fp): return try: code = fp.read_text(encoding="utf-8") tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, ast.ClassDef): for base in node.bases: if isinstance(base, ast.Name) and base.id == class_name: subclasses.append(f"{fp.name}: class {node.name}({class_name})") elif isinstance(base, ast.Attribute) and base.attr == class_name: if isinstance(base.value, ast.Name): subclasses.append(f"{fp.name}: class {node.name}({base.value.id}.{class_name})") except Exception: pass try: if p.is_file(): _search_file(p) else: for root, dirs, files in os.walk(p): dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')] for file in files: if file.endswith('.py'): _search_file(Path(root) / file) if not subclasses: return f"No subclasses of '{class_name}' found in {p}" return f"Subclasses of '{class_name}':\n" + "\n".join(f" - {s}" for s in subclasses) except Exception as e: return f"ERROR finding subclasses of '{class_name}': {e}" def py_get_docstring(path: str, name: str) -> str: """Extracts the docstring for a specific module, class, or function.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}" try: code = p.read_text(encoding="utf-8") tree = ast.parse(code) if not name or name == "module": doc = ast.get_docstring(tree) return doc if doc else "No module docstring found." node = _get_symbol_node(tree, name) if not node: return f"ERROR: could not find symbol '{name}' in {path}" if isinstance(node, (ast.AsyncFunctionDef, ast.FunctionDef, ast.ClassDef, ast.Module)): doc = ast.get_docstring(node) return doc if doc else f"No docstring found for '{name}'." return f"No docstring found for '{name}'." except Exception as e: return f"ERROR getting docstring for '{name}': {e}" def get_tree(path: str, max_depth: int = 2) -> str: """Returns a directory structure up to a max depth.""" p, err = _resolve_and_check(path) if err: return err assert p is not None if not p.is_dir(): return f"ERROR: not a directory: {path}" try: m_depth = max_depth def _build_tree(dir_path: Path, current_depth: int, prefix: str = "") -> list[str]: if current_depth > m_depth: return [] lines = [] try: entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) except PermissionError: return [] # Filter entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")] for i, entry in enumerate(entries): is_last = (i == len(entries) - 1) connector = "└── " if is_last else "├── " lines.append(f"{prefix}{connector}{entry.name}") if entry.is_dir(): extension = " " if is_last else "│ " lines.extend(_build_tree(entry, current_depth + 1, prefix + extension)) return lines tree_lines = [f"{p.name}/"] + _build_tree(p, 1) return "\n".join(tree_lines) except Exception as e: return f"ERROR generating tree for '{path}': {e}" # ------------------------------------------------------------------ web tools class _DDGParser(HTMLParser): def __init__(self) -> None: super().__init__() self.results: list[dict[str, str]] = [] self.in_result: bool = False self.in_title: bool = False self.in_snippet: bool = False self.current_link: str = "" self.current_title: str = "" self.current_snippet: str = "" def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: attrs_dict = dict(attrs) if tag == "a" and "result__url" in cast(str, attrs_dict.get("class", "")): self.current_link = cast(str, attrs_dict.get("href", "")) if tag == "a" and "result__snippet" in cast(str, attrs_dict.get("class", "")): self.in_snippet = True if tag == "h2" and "result__title" in cast(str, attrs_dict.get("class", "")): self.in_title = True def handle_endtag(self, tag: str) -> None: if tag == "a" and self.in_snippet: self.in_snippet = False if tag == "h2" and self.in_title: self.in_title = False if self.current_link: self.results.append({ "title": self.current_title.strip(), "link": self.current_link, "snippet": self.current_snippet.strip() }) self.current_title = "" self.current_snippet = "" self.current_link = "" def handle_data(self, data: str) -> None: if self.in_title: self.current_title += data if self.in_snippet: self.current_snippet += data class _TextExtractor(HTMLParser): def __init__(self) -> None: super().__init__() self.text: list[str] = [] self.hide: int = 0 self.ignore_tags: set[str] = {'script', 'style', 'head', 'meta', 'nav', 'header', 'footer', 'noscript', 'svg'} def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: if tag in self.ignore_tags: self.hide += 1 def handle_endtag(self, tag: str) -> None: if tag in self.ignore_tags: self.hide -= 1 def handle_data(self, data: str) -> None: if self.hide == 0: cleaned = data.strip() if cleaned: self.text.append(cleaned) def web_search(query: str) -> str: """Search the web using DuckDuckGo HTML and return top results.""" url = "https://html.duckduckgo.com/html/?q=" + urllib.parse.quote(query) req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}) try: with urllib.request.urlopen(req, timeout=10) as resp: html = resp.read().decode('utf-8', errors='ignore') parser = _DDGParser() parser.feed(html) if not parser.results: return f"No results found for '{query}'" lines = [f"Search Results for '{query}':"] for i, r in enumerate(parser.results[:5], 1): lines.append(f"{i}. {r['title']}\nURL: {r['link']}\nSnippet: {r['snippet']}\n") return "\n".join(lines) except Exception as e: return f"ERROR searching web for '{query}': {e}" def fetch_url(url: str) -> str: """Fetch a URL and return its text content (stripped of HTML tags).""" # Correct duckduckgo redirect links if passed if url.startswith("//duckduckgo.com/l/?uddg="): split_uddg = url.split("uddg=") if len(split_uddg) > 1: url = urllib.parse.unquote(split_uddg[1].split("&")[0]) if not url.startswith("http"): url = "https://" + url req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'}) try: with urllib.request.urlopen(req, timeout=10) as resp: html = resp.read().decode('utf-8', errors='ignore') parser = _TextExtractor() parser.feed(html) full_text = " ".join(parser.text) full_text = _re.sub(r'\s+', ' ', full_text) if not full_text.strip(): return f"FETCH OK: No readable text extracted from {url}. The page might be empty, JavaScript-heavy, or blocked." # Limit to 40k chars to prevent context blowup if len(full_text) > 40000: return full_text[:40000] + "\n... (content truncated)" return full_text except Exception as e: return f"ERROR fetching URL '{url}': {e}" def get_ui_performance() -> str: """Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag).""" if perf_monitor_callback is None: return "INFO: UI Performance monitor is not available (headless/CLI mode). This tool is only functional when the Manual Slop GUI is running." try: metrics = perf_monitor_callback() # Clean up the dict string for the AI metric_str = str(metrics) for char in "{}'": metric_str = metric_str.replace(char, "") return f"UI Performance Snapshot:\n{metric_str}" except Exception as e: return f"ERROR: Failed to retrieve UI performance: {str(e)}" # ------------------------------------------------------------------ tool dispatch TOOL_NAMES: set[str] = {"read_file", "list_directory", "search_files", "get_file_summary", "py_get_skeleton", "py_get_code_outline", "py_get_definition", "get_git_diff", "web_search", "fetch_url", "get_ui_performance", "get_file_slice", "set_file_slice", "edit_file", "py_update_definition", "py_get_signature", "py_set_signature", "py_get_class_summary", "py_get_var_declaration", "py_set_var_declaration", "py_find_usages", "py_get_imports", "py_check_syntax", "py_get_hierarchy", "py_get_docstring", "get_tree"} def dispatch(tool_name: str, tool_input: dict[str, Any]) -> str: """ Dispatch an MCP tool call by name. Returns the result as a string. """ # Handle aliases path = str(tool_input.get("path", tool_input.get("file_path", tool_input.get("dir_path", "")))) if tool_name == "read_file": return read_file(path) if tool_name == "list_directory": return list_directory(path) if tool_name == "search_files": return search_files(path, str(tool_input.get("pattern", "*"))) if tool_name == "get_file_summary": return get_file_summary(path) if tool_name == "py_get_skeleton": return py_get_skeleton(path) if tool_name == "py_get_code_outline": return py_get_code_outline(path) if tool_name == "py_get_definition": return py_get_definition(path, str(tool_input.get("name", ""))) if tool_name == "py_update_definition": return py_update_definition(path, str(tool_input.get("name", "")), str(tool_input.get("new_content", ""))) if tool_name == "py_get_signature": return py_get_signature(path, str(tool_input.get("name", ""))) if tool_name == "py_set_signature": return py_set_signature(path, str(tool_input.get("name", "")), str(tool_input.get("new_signature", ""))) if tool_name == "py_get_class_summary": return py_get_class_summary(path, str(tool_input.get("name", ""))) if tool_name == "py_get_var_declaration": return py_get_var_declaration(path, str(tool_input.get("name", ""))) if tool_name == "py_set_var_declaration": return py_set_var_declaration(path, str(tool_input.get("name", "")), str(tool_input.get("new_declaration", ""))) if tool_name == "get_file_slice": return get_file_slice(path, int(tool_input.get("start_line", 1)), int(tool_input.get("end_line", 1))) if tool_name == "set_file_slice": return set_file_slice(path, int(tool_input.get("start_line", 1)), int(tool_input.get("end_line", 1)), str(tool_input.get("new_content", ""))) if tool_name == "get_git_diff": return get_git_diff( path, str(tool_input.get("base_rev", "HEAD")), str(tool_input.get("head_rev", "")) ) if tool_name == "edit_file": return edit_file( path, str(tool_input.get("old_string", "")), str(tool_input.get("new_string", "")), bool(tool_input.get("replace_all", False)) ) if tool_name == "web_search": return web_search(str(tool_input.get("query", ""))) if tool_name == "fetch_url": return fetch_url(str(tool_input.get("url", ""))) if tool_name == "get_ui_performance": return get_ui_performance() if tool_name == "py_find_usages": return py_find_usages(path, str(tool_input.get("name", ""))) if tool_name == "py_get_imports": return py_get_imports(path) if tool_name == "py_check_syntax": return py_check_syntax(path) if tool_name == "py_get_hierarchy": return py_get_hierarchy(path, str(tool_input.get("class_name", ""))) if tool_name == "py_get_docstring": return py_get_docstring(path, str(tool_input.get("name", ""))) if tool_name == "get_tree": return get_tree(path, int(tool_input.get("max_depth", 2))) return f"ERROR: unknown MCP tool '{tool_name}'" async def async_dispatch(tool_name: str, tool_input: dict[str, Any]) -> str: """ Dispatch an MCP tool call by name asynchronously. Returns the result as a string. """ # Run blocking I/O bound tools in a thread to allow parallel execution via asyncio.gather return await asyncio.to_thread(dispatch, tool_name, tool_input) def get_tool_schemas() -> list[dict[str, Any]]: """Returns the list of tool specifications for the AI.""" return list(MCP_TOOL_SPECS) # ------------------------------------------------------------------ tool schema helpers # These are imported by ai_client.py to build provider-specific declarations. MCP_TOOL_SPECS: list[dict[str, Any]] = [ { "name": "read_file", "description": ( "Read the full UTF-8 content of a file within the allowed project paths. " "Use get_file_summary first to decide whether you need the full content." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute or relative path to the file to read.", } }, "required": ["path"], }, }, { "name": "list_directory", "description": ( "List files and subdirectories within an allowed directory. " "Shows name, type (file/dir), and size. Use this to explore the project structure." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute path to the directory to list.", } }, "required": ["path"], }, }, { "name": "search_files", "description": ( "Search for files matching a glob pattern within an allowed directory. " "Supports recursive patterns like '**/*.py'. " "Use this to find files by extension or name pattern." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute path to the directory to search within.", }, "pattern": { "type": "string", "description": "Glob pattern, e.g. '*.py', '**/*.toml', 'src/**/*.rs'.", }, }, "required": ["path", "pattern"], }, }, { "name": "get_file_summary", "description": ( "Get a compact heuristic summary of a file without reading its full content. " "For Python: imports, classes, methods, functions, constants. " "For TOML: table keys. For Markdown: headings. Others: line count + preview. " "Use this before read_file to decide if you need the full content." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute or relative path to the file to summarise.", } }, "required": ["path"], }, }, { "name": "py_get_skeleton", "description": ( "Get a skeleton view of a Python file. " "This returns all classes and function signatures with their docstrings, " "but replaces function bodies with '...'. " "Use this to understand module interfaces without reading the full implementation." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file.", } }, "required": ["path"], }, }, { "name": "py_get_code_outline", "description": ( "Get a hierarchical outline of a code file. " "This returns classes, functions, and methods with their line ranges and brief docstrings. " "Use this to quickly map out a file's structure before reading specific sections." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the code file (currently supports .py).", } }, "required": ["path"], }, }, { "name": "get_file_slice", "description": "Read a specific line range from a file. Useful for reading parts of very large files.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file." }, "start_line": { "type": "integer", "description": "1-based start line number." }, "end_line": { "type": "integer", "description": "1-based end line number (inclusive)." } }, "required": ["path", "start_line", "end_line"] } }, { "name": "set_file_slice", "description": "Replace a specific line range in a file with new content. Surgical edit tool.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file." }, "start_line": { "type": "integer", "description": "1-based start line number." }, "end_line": { "type": "integer", "description": "1-based end line number (inclusive)." }, "new_content": { "type": "string", "description": "New content to insert." } }, "required": ["path", "start_line", "end_line", "new_content"] } }, { "name": "edit_file", "description": "Replace exact string match in a file. Preserves indentation and line endings. Drop-in replacement for native edit tool.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file." }, "old_string": { "type": "string", "description": "The text to replace." }, "new_string": { "type": "string", "description": "The replacement text." }, "replace_all": { "type": "boolean", "description": "Replace all occurrences. Default false." } }, "required": ["path", "old_string", "new_string"] } }, { "name": "py_get_definition", "description": ( "Get the full source code of a specific class, function, or method definition. " "This is more efficient than reading the whole file if you know what you're looking for." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file.", }, "name": { "type": "string", "description": "The name of the class or function to retrieve. Use 'ClassName.method_name' for methods.", } }, "required": ["path", "name"], }, }, { "name": "py_update_definition", "description": "Surgically replace the definition of a class or function in a Python file using AST to find line ranges.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file." }, "name": { "type": "string", "description": "Name of class/function/method." }, "new_content": { "type": "string", "description": "Complete new source for the definition." } }, "required": ["path", "name", "new_content"] } }, { "name": "py_get_signature", "description": "Get only the signature part of a Python function or method (from def until colon).", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file." }, "name": { "type": "string", "description": "Name of the function/method (e.g. 'ClassName.method_name')." } }, "required": ["path", "name"] } }, { "name": "py_set_signature", "description": "Surgically replace only the signature of a Python function or method.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file." }, "name": { "type": "string", "description": "Name of the function/method." }, "new_signature": { "type": "string", "description": "Complete new signature string (including def and trailing colon)." } }, "required": ["path", "name", "new_signature"] } }, { "name": "py_get_class_summary", "description": "Get a summary of a Python class, listing its docstring and all method signatures.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file." }, "name": { "type": "string", "description": "Name of the class." } }, "required": ["path", "name"] } }, { "name": "py_get_var_declaration", "description": "Get the assignment/declaration line for a variable.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file." }, "name": { "type": "string", "description": "Name of the variable." } }, "required": ["path", "name"] } }, { "name": "py_set_var_declaration", "description": "Surgically replace a variable assignment/declaration.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file." }, "name": { "type": "string", "description": "Name of the variable." }, "new_declaration": { "type": "string", "description": "Complete new assignment/declaration string." } }, "required": ["path", "name", "new_declaration"] } }, { "name": "get_git_diff", "description": ( "Returns the git diff for a file or directory. " "Use this to review changes efficiently without reading entire files." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file or directory.", }, "base_rev": { "type": "string", "description": "Base revision (e.g. 'HEAD', 'HEAD~1', or a commit hash). Defaults to 'HEAD'.", }, "head_rev": { "type": "string", "description": "Head revision (optional).", } }, "required": ["path"], }, }, { "name": "web_search", "description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "The search query." } }, "required": ["query"] } }, { "name": "fetch_url", "description": "Fetch the full text content of a URL (stripped of HTML tags). Use this after web_search to read relevant information from the web.", "parameters": { "type": "object", "properties": { "url": { "type": "string", "description": "The full URL to fetch." } }, "required": ["url"] } }, { "name": "get_ui_performance", "description": "Get a snapshot of the current UI performance metrics, including FPS, Frame Time (ms), CPU usage (%), and Input Lag (ms). Use this to diagnose UI slowness or verify that your changes haven't degraded the user experience.", "parameters": { "type": "object", "properties": {} } }, { "name": "py_find_usages", "description": "Finds exact string matches of a symbol in a given file or directory.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to file or directory to search." }, "name": { "type": "string", "description": "The symbol/string to search for." } }, "required": ["path", "name"] } }, { "name": "py_get_imports", "description": "Parses a file's AST and returns a strict list of its dependencies.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file." } }, "required": ["path"] } }, { "name": "py_check_syntax", "description": "Runs a quick syntax check on a Python file.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file." } }, "required": ["path"] } }, { "name": "py_get_hierarchy", "description": "Scans the project to find subclasses of a given class.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Directory path to search in." }, "class_name": { "type": "string", "description": "Name of the base class." } }, "required": ["path", "class_name"] } }, { "name": "py_get_docstring", "description": "Extracts the docstring for a specific module, class, or function.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the .py file." }, "name": { "type": "string", "description": "Name of symbol or 'module' for the file docstring." } }, "required": ["path", "name"] } }, { "name": "get_tree", "description": "Returns a directory structure up to a max depth.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Directory path." }, "max_depth": { "type": "integer", "description": "Maximum depth to recurse (default 2)." } }, "required": ["path"] } } ]