# mcp_client.py """ Note(Gemini): MCP-style file context tools for manual_slop. Exposes read-only filesystem tools the AI can call to selectively fetch file content on demand, instead of having everything inlined into the context block. All access is restricted to paths that are either: - Explicitly listed in the project's allowed_paths set, OR - Contained within an allowed base_dir (must resolve to a subpath of it) This is heavily inspired by Claude's own tooling limits. We enforce safety here so the AI doesn't wander outside the project workspace. """ # mcp_client.py #MCP-style file context tools for manual_slop. # Exposes read-only filesystem tools the AI can call to selectively fetch file # content on demand, instead of having everything inlined into the context block. # All access is restricted to paths that are either: # - Explicitly listed in the project's allowed_paths set, OR # - Contained within an allowed base_dir (must resolve to a subpath of it) # Tools exposed: # read_file(path) - return full UTF-8 content of a file # list_directory(path) - list entries in a directory (names + type) # search_files(path, pattern) - glob pattern search within an allowed dir # get_file_summary(path) - return the summarize.py heuristic summary # from pathlib import Path import summarize import urllib.request import urllib.parse from html.parser import HTMLParser import re as _re # ------------------------------------------------------------------ state # Set by configure() before the AI send loop starts. # allowed_paths : set of resolved absolute Path objects (files or dirs) # base_dirs : set of resolved absolute Path dirs that act as roots _allowed_paths: set[Path] = set() _base_dirs: set[Path] = set() _primary_base_dir: Path | None = None # Injected by gui.py - returns a dict of performance metrics perf_monitor_callback = None def configure(file_items: list[dict], extra_base_dirs: list[str] | None = None): """ Build the allowlist from aggregate file_items. Called by ai_client before each send so the list reflects the current project. file_items : list of dicts from aggregate.build_file_items() extra_base_dirs : additional directory roots to allow traversal of """ global _allowed_paths, _base_dirs, _primary_base_dir _allowed_paths = set() _base_dirs = set() _primary_base_dir = Path(extra_base_dirs[0]).resolve() if extra_base_dirs else Path.cwd() for item in file_items: p = item.get("path") if p is not None: rp = Path(p).resolve() _allowed_paths.add(rp) _base_dirs.add(rp.parent) if extra_base_dirs: for d in extra_base_dirs: dp = Path(d).resolve() if dp.is_dir(): _base_dirs.add(dp) def _is_allowed(path: Path) -> bool: """ Return True if `path` is within the allowlist. A path is allowed if: - it is explicitly in _allowed_paths, OR - it is contained within (or equal to) one of the _base_dirs """ rp = path.resolve() if rp in _allowed_paths: return True for bd in _base_dirs: try: rp.relative_to(bd) return True except ValueError: continue return False def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]: """ Resolve raw_path and verify it passes the allowlist check. Returns (resolved_path, error_string). error_string is empty on success. """ try: p = Path(raw_path) if not p.is_absolute() and _primary_base_dir: p = _primary_base_dir / p p = p.resolve() except Exception as e: return None, f"ERROR: invalid path '{raw_path}': {e}" if not _is_allowed(p): return None, ( f"ACCESS DENIED: '{raw_path}' is not within the allowed paths. " f"Use list_directory or search_files on an allowed base directory first." ) return p, "" # ------------------------------------------------------------------ tool implementations def read_file(path: str) -> str: """Return the UTF-8 content of a file, or an error string.""" p, err = _resolve_and_check(path) if err: return err if not p.exists(): return f"ERROR: file not found: {path}" if not p.is_file(): return f"ERROR: not a file: {path}" try: return p.read_text(encoding="utf-8") except Exception as e: return f"ERROR reading '{path}': {e}" def list_directory(path: str) -> str: """List entries in a directory. Returns a compact text table.""" p, err = _resolve_and_check(path) if err: return err if not p.exists(): return f"ERROR: path not found: {path}" if not p.is_dir(): return f"ERROR: not a directory: {path}" try: entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) lines = [f"Directory: {p}", ""] for entry in entries: kind = "file" if entry.is_file() else "dir " size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else "" lines.append(f" [{kind}] {entry.name:<40} {size}") lines.append(f" ({len(entries)} entries)") return "\n".join(lines) except Exception as e: return f"ERROR listing '{path}': {e}" def search_files(path: str, pattern: str) -> str: """ Search for files matching a glob pattern within path. pattern examples: '*.py', '**/*.toml', 'src/**/*.rs' """ p, err = _resolve_and_check(path) if err: return err if not p.is_dir(): return f"ERROR: not a directory: {path}" try: matches = sorted(p.glob(pattern)) if not matches: return f"No files matched '{pattern}' in {path}" lines = [f"Search '{pattern}' in {p}:", ""] for m in matches: rel = m.relative_to(p) kind = "file" if m.is_file() else "dir " lines.append(f" [{kind}] {rel}") lines.append(f" ({len(matches)} match(es))") return "\n".join(lines) except Exception as e: return f"ERROR searching '{path}': {e}" def get_file_summary(path: str) -> str: """ Return the heuristic summary for a file (same as the initial context block). For .py files: imports, classes, methods, functions, constants. For .toml: table keys. For .md: headings. Others: line count + preview. """ p, err = _resolve_and_check(path) if err: return err if not p.exists(): return f"ERROR: file not found: {path}" if not p.is_file(): return f"ERROR: not a file: {path}" try: content = p.read_text(encoding="utf-8") return summarize.summarise_file(p, content) except Exception as e: return f"ERROR summarising '{path}': {e}" # ------------------------------------------------------------------ web tools class _DDGParser(HTMLParser): def __init__(self): super().__init__() self.results = [] self.in_result = False self.in_title = False self.in_snippet = False self.current_link = "" self.current_title = "" self.current_snippet = "" def handle_starttag(self, tag, attrs): attrs = dict(attrs) if tag == "a" and "result__url" in attrs.get("class", ""): self.current_link = attrs.get("href", "") if tag == "a" and "result__snippet" in attrs.get("class", ""): self.in_snippet = True if tag == "h2" and "result__title" in attrs.get("class", ""): self.in_title = True def handle_endtag(self, tag): if tag == "a" and self.in_snippet: self.in_snippet = False if tag == "h2" and self.in_title: self.in_title = False if self.current_link: self.results.append({ "title": self.current_title.strip(), "link": self.current_link, "snippet": self.current_snippet.strip() }) self.current_title = "" self.current_snippet = "" self.current_link = "" def handle_data(self, data): if self.in_title: self.current_title += data if self.in_snippet: self.current_snippet += data class _TextExtractor(HTMLParser): def __init__(self): super().__init__() self.text = [] self.hide = 0 self.ignore_tags = {'script', 'style', 'head', 'meta', 'nav', 'header', 'footer', 'noscript', 'svg'} def handle_starttag(self, tag, attrs): if tag in self.ignore_tags: self.hide += 1 def handle_endtag(self, tag): if tag in self.ignore_tags: self.hide -= 1 def handle_data(self, data): if self.hide == 0: cleaned = data.strip() if cleaned: self.text.append(cleaned) def web_search(query: str) -> str: """Search the web using DuckDuckGo HTML and return top results.""" url = "https://html.duckduckgo.com/html/?q=" + urllib.parse.quote(query) req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}) try: html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore') parser = _DDGParser() parser.feed(html) if not parser.results: return f"No results found for '{query}'" lines = [f"Search Results for '{query}':"] for i, r in enumerate(parser.results[:5], 1): lines.append(f"{i}. {r['title']}\nURL: {r['link']}\nSnippet: {r['snippet']}\n") return "\n".join(lines) except Exception as e: return f"ERROR searching web for '{query}': {e}" def fetch_url(url: str) -> str: """Fetch a URL and return its text content (stripped of HTML tags).""" # Correct duckduckgo redirect links if passed if url.startswith("//duckduckgo.com/l/?uddg="): url = urllib.parse.unquote(url.split("uddg=")[1].split("&")[0]) if not url.startswith("http"): url = "https://" + url req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}) try: html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore') parser = _TextExtractor() parser.feed(html) full_text = " ".join(parser.text) full_text = _re.sub(r'\s+', ' ', full_text) # Limit to 40k chars to prevent context blowup if len(full_text) > 40000: return full_text[:40000] + "\n... (content truncated)" return full_text except Exception as e: return f"ERROR fetching URL '{url}': {e}" def get_ui_performance() -> str: """Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag).""" if perf_monitor_callback is None: return "ERROR: Performance monitor callback not registered." try: metrics = perf_monitor_callback() # Clean up the dict string for the AI metric_str = str(metrics) for char in "{}'": metric_str = metric_str.replace(char, "") return f"UI Performance Snapshot:\n{metric_str}" except Exception as e: return f"ERROR: Failed to retrieve UI performance: {str(e)}" # ------------------------------------------------------------------ tool dispatch TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url", "get_ui_performance"} def dispatch(tool_name: str, tool_input: dict) -> str: """ Dispatch an MCP tool call by name. Returns the result as a string. """ if tool_name == "read_file": return read_file(tool_input.get("path", "")) if tool_name == "list_directory": return list_directory(tool_input.get("path", "")) if tool_name == "search_files": return search_files(tool_input.get("path", ""), tool_input.get("pattern", "*")) if tool_name == "get_file_summary": return get_file_summary(tool_input.get("path", "")) if tool_name == "web_search": return web_search(tool_input.get("query", "")) if tool_name == "fetch_url": return fetch_url(tool_input.get("url", "")) if tool_name == "get_ui_performance": return get_ui_performance() return f"ERROR: unknown MCP tool '{tool_name}'" # ------------------------------------------------------------------ tool schema helpers # These are imported by ai_client.py to build provider-specific declarations. MCP_TOOL_SPECS = [ { "name": "read_file", "description": ( "Read the full UTF-8 content of a file within the allowed project paths. " "Use get_file_summary first to decide whether you need the full content." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute or relative path to the file to read.", } }, "required": ["path"], }, }, { "name": "list_directory", "description": ( "List files and subdirectories within an allowed directory. " "Shows name, type (file/dir), and size. Use this to explore the project structure." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute path to the directory to list.", } }, "required": ["path"], }, }, { "name": "search_files", "description": ( "Search for files matching a glob pattern within an allowed directory. " "Supports recursive patterns like '**/*.py'. " "Use this to find files by extension or name pattern." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute path to the directory to search within.", }, "pattern": { "type": "string", "description": "Glob pattern, e.g. '*.py', '**/*.toml', 'src/**/*.rs'.", }, }, "required": ["path", "pattern"], }, }, { "name": "get_file_summary", "description": ( "Get a compact heuristic summary of a file without reading its full content. " "For Python: imports, classes, methods, functions, constants. " "For TOML: table keys. For Markdown: headings. Others: line count + preview. " "Use this before read_file to decide if you need the full content." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute or relative path to the file to summarise.", } }, "required": ["path"], }, }, { "name": "web_search", "description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "The search query." } }, "required": ["query"] } }, { "name": "get_ui_performance", "description": "Get a snapshot of the current UI performance metrics, including FPS, Frame Time (ms), CPU usage (%), and Input Lag (ms). Use this to diagnose UI slowness or verify that your changes haven't degraded the user experience.", "parameters": { "type": "object", "properties": {} } } ]