Files
manual_slop/mcp_client.py
2026-02-22 09:20:02 -05:00

432 lines
15 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# mcp_client.py
"""
Note(Gemini):
MCP-style file context tools for manual_slop.
Exposes read-only filesystem tools the AI can call to selectively fetch file
content on demand, instead of having everything inlined into the context block.
All access is restricted to paths that are either:
- Explicitly listed in the project's allowed_paths set, OR
- Contained within an allowed base_dir (must resolve to a subpath of it)
This is heavily inspired by Claude's own tooling limits. We enforce safety here
so the AI doesn't wander outside the project workspace.
"""
# mcp_client.py
#MCP-style file context tools for manual_slop.
# Exposes read-only filesystem tools the AI can call to selectively fetch file
# content on demand, instead of having everything inlined into the context block.
# All access is restricted to paths that are either:
# - Explicitly listed in the project's allowed_paths set, OR
# - Contained within an allowed base_dir (must resolve to a subpath of it)
# Tools exposed:
# read_file(path) - return full UTF-8 content of a file
# list_directory(path) - list entries in a directory (names + type)
# search_files(path, pattern) - glob pattern search within an allowed dir
# get_file_summary(path) - return the summarize.py heuristic summary
#
from pathlib import Path
import summarize
import urllib.request
import urllib.parse
from html.parser import HTMLParser
import re as _re
# ------------------------------------------------------------------ state
# Set by configure() before the AI send loop starts.
# allowed_paths : set of resolved absolute Path objects (files or dirs)
# base_dirs : set of resolved absolute Path dirs that act as roots
_allowed_paths: set[Path] = set()
_base_dirs: set[Path] = set()
def configure(file_items: list[dict], extra_base_dirs: list[str] | None = None):
"""
Build the allowlist from aggregate file_items.
Called by ai_client before each send so the list reflects the current project.
file_items : list of dicts from aggregate.build_file_items()
extra_base_dirs : additional directory roots to allow traversal of
"""
global _allowed_paths, _base_dirs
_allowed_paths = set()
_base_dirs = set()
for item in file_items:
p = item.get("path")
if p is not None:
rp = Path(p).resolve()
_allowed_paths.add(rp)
_base_dirs.add(rp.parent)
if extra_base_dirs:
for d in extra_base_dirs:
dp = Path(d).resolve()
if dp.is_dir():
_base_dirs.add(dp)
def _is_allowed(path: Path) -> bool:
"""
Return True if `path` is within the allowlist.
A path is allowed if:
- it is explicitly in _allowed_paths, OR
- it is contained within (or equal to) one of the _base_dirs
"""
rp = path.resolve()
if rp in _allowed_paths:
return True
for bd in _base_dirs:
try:
rp.relative_to(bd)
return True
except ValueError:
continue
return False
def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
"""
Resolve raw_path and verify it passes the allowlist check.
Returns (resolved_path, error_string). error_string is empty on success.
"""
try:
p = Path(raw_path).resolve()
except Exception as e:
return None, f"ERROR: invalid path '{raw_path}': {e}"
if not _is_allowed(p):
return None, (
f"ACCESS DENIED: '{raw_path}' is not within the allowed paths. "
f"Use list_directory or search_files on an allowed base directory first."
)
return p, ""
# ------------------------------------------------------------------ tool implementations
def read_file(path: str) -> str:
"""Return the UTF-8 content of a file, or an error string."""
p, err = _resolve_and_check(path)
if err:
return err
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
return p.read_text(encoding="utf-8")
except Exception as e:
return f"ERROR reading '{path}': {e}"
def list_directory(path: str) -> str:
"""List entries in a directory. Returns a compact text table."""
p, err = _resolve_and_check(path)
if err:
return err
if not p.exists():
return f"ERROR: path not found: {path}"
if not p.is_dir():
return f"ERROR: not a directory: {path}"
try:
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = [f"Directory: {p}", ""]
for entry in entries:
kind = "file" if entry.is_file() else "dir "
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
lines.append(f" [{kind}] {entry.name:<40} {size}")
lines.append(f" ({len(entries)} entries)")
return "\n".join(lines)
except Exception as e:
return f"ERROR listing '{path}': {e}"
def search_files(path: str, pattern: str) -> str:
"""
Search for files matching a glob pattern within path.
pattern examples: '*.py', '**/*.toml', 'src/**/*.rs'
"""
p, err = _resolve_and_check(path)
if err:
return err
if not p.is_dir():
return f"ERROR: not a directory: {path}"
try:
matches = sorted(p.glob(pattern))
if not matches:
return f"No files matched '{pattern}' in {path}"
lines = [f"Search '{pattern}' in {p}:", ""]
for m in matches:
rel = m.relative_to(p)
kind = "file" if m.is_file() else "dir "
lines.append(f" [{kind}] {rel}")
lines.append(f" ({len(matches)} match(es))")
return "\n".join(lines)
except Exception as e:
return f"ERROR searching '{path}': {e}"
def get_file_summary(path: str) -> str:
"""
Return the heuristic summary for a file (same as the initial context block).
For .py files: imports, classes, methods, functions, constants.
For .toml: table keys. For .md: headings. Others: line count + preview.
"""
p, err = _resolve_and_check(path)
if err:
return err
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
content = p.read_text(encoding="utf-8")
return summarize.summarise_file(p, content)
except Exception as e:
return f"ERROR summarising '{path}': {e}"
# ------------------------------------------------------------------ web tools
class _DDGParser(HTMLParser):
def __init__(self):
super().__init__()
self.results = []
self.in_result = False
self.in_title = False
self.in_snippet = False
self.current_link = ""
self.current_title = ""
self.current_snippet = ""
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
if tag == "a" and "result__url" in attrs.get("class", ""):
self.current_link = attrs.get("href", "")
if tag == "a" and "result__snippet" in attrs.get("class", ""):
self.in_snippet = True
if tag == "h2" and "result__title" in attrs.get("class", ""):
self.in_title = True
def handle_endtag(self, tag):
if tag == "a" and self.in_snippet:
self.in_snippet = False
if tag == "h2" and self.in_title:
self.in_title = False
if self.current_link:
self.results.append({
"title": self.current_title.strip(),
"link": self.current_link,
"snippet": self.current_snippet.strip()
})
self.current_title = ""
self.current_snippet = ""
self.current_link = ""
def handle_data(self, data):
if self.in_title:
self.current_title += data
if self.in_snippet:
self.current_snippet += data
class _TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.text = []
self.hide = 0
self.ignore_tags = {'script', 'style', 'head', 'meta', 'nav', 'header', 'footer', 'noscript', 'svg'}
def handle_starttag(self, tag, attrs):
if tag in self.ignore_tags:
self.hide += 1
def handle_endtag(self, tag):
if tag in self.ignore_tags:
self.hide -= 1
def handle_data(self, data):
if self.hide == 0:
cleaned = data.strip()
if cleaned:
self.text.append(cleaned)
def web_search(query: str) -> str:
"""Search the web using DuckDuckGo HTML and return top results."""
url = "https://html.duckduckgo.com/html/?q=" + urllib.parse.quote(query)
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
try:
html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore')
parser = _DDGParser()
parser.feed(html)
if not parser.results:
return f"No results found for '{query}'"
lines = [f"Search Results for '{query}':"]
for i, r in enumerate(parser.results[:5], 1):
lines.append(f"{i}. {r['title']}\nURL: {r['link']}\nSnippet: {r['snippet']}\n")
return "\n".join(lines)
except Exception as e:
return f"ERROR searching web for '{query}': {e}"
def fetch_url(url: str) -> str:
"""Fetch a URL and return its text content (stripped of HTML tags)."""
# Correct duckduckgo redirect links if passed
if url.startswith("//duckduckgo.com/l/?uddg="):
url = urllib.parse.unquote(url.split("uddg=")[1].split("&")[0])
if not url.startswith("http"):
url = "https://" + url
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
try:
html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore')
parser = _TextExtractor()
parser.feed(html)
full_text = " ".join(parser.text)
full_text = _re.sub(r'\s+', ' ', full_text)
# Limit to 40k chars to prevent context blowup
if len(full_text) > 40000:
return full_text[:40000] + "\n... (content truncated)"
return full_text
except Exception as e:
return f"ERROR fetching URL '{url}': {e}"
# ------------------------------------------------------------------ tool dispatch
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"}
def dispatch(tool_name: str, tool_input: dict) -> str:
"""
Dispatch an MCP tool call by name. Returns the result as a string.
"""
if tool_name == "read_file":
return read_file(tool_input.get("path", ""))
if tool_name == "list_directory":
return list_directory(tool_input.get("path", ""))
if tool_name == "search_files":
return search_files(tool_input.get("path", ""), tool_input.get("pattern", "*"))
if tool_name == "get_file_summary":
return get_file_summary(tool_input.get("path", ""))
if tool_name == "web_search":
return web_search(tool_input.get("query", ""))
if tool_name == "fetch_url":
return fetch_url(tool_input.get("url", ""))
return f"ERROR: unknown MCP tool '{tool_name}'"
# ------------------------------------------------------------------ tool schema helpers
# These are imported by ai_client.py to build provider-specific declarations.
MCP_TOOL_SPECS = [
{
"name": "read_file",
"description": (
"Read the full UTF-8 content of a file within the allowed project paths. "
"Use get_file_summary first to decide whether you need the full content."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file to read.",
}
},
"required": ["path"],
},
},
{
"name": "list_directory",
"description": (
"List files and subdirectories within an allowed directory. "
"Shows name, type (file/dir), and size. Use this to explore the project structure."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the directory to list.",
}
},
"required": ["path"],
},
},
{
"name": "search_files",
"description": (
"Search for files matching a glob pattern within an allowed directory. "
"Supports recursive patterns like '**/*.py'. "
"Use this to find files by extension or name pattern."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the directory to search within.",
},
"pattern": {
"type": "string",
"description": "Glob pattern, e.g. '*.py', '**/*.toml', 'src/**/*.rs'.",
},
},
"required": ["path", "pattern"],
},
},
{
"name": "get_file_summary",
"description": (
"Get a compact heuristic summary of a file without reading its full content. "
"For Python: imports, classes, methods, functions, constants. "
"For TOML: table keys. For Markdown: headings. Others: line count + preview. "
"Use this before read_file to decide if you need the full content."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file to summarise.",
}
},
"required": ["path"],
},
},
{
"name": "web_search",
"description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
},
{
"name": "fetch_url",
"description": "Fetch a webpage and extract its text content, removing HTML tags and scripts. Useful for reading documentation or articles found via web_search.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch."
}
},
"required": ["url"]
}
},
]