Files
manual_slop/mcp_client.py
2026-02-21 23:17:42 -05:00

557 lines
19 KiB
Python

# mcp_client.py
#MCP-style file context tools for manual_slop.
# Exposes read-only filesystem tools the AI can call to selectively fetch file
# content on demand, instead of having everything inlined into the context block.
# All access is restricted to paths that are either:
# - Explicitly listed in the project's allowed_paths set, OR
# - Contained within an allowed base_dir (must resolve to a subpath of it)
# Tools exposed:
# read_file(path) - return full UTF-8 content of a file
# list_directory(path) - list entries in a directory (names + type)
# search_files(path, pattern) - glob pattern search within an allowed dir
# get_file_summary(path) - return the summarize.py heuristic summary
#
from pathlib import Path
import summarize
import urllib.request
import urllib.parse
from html.parser import HTMLParser
import re as _re
# ------------------------------------------------------------------ state
# Set by configure() before the AI send loop starts.
# allowed_paths : set of resolved absolute Path objects (files or dirs)
# base_dirs : set of resolved absolute Path dirs that act as roots
_allowed_paths: set[Path] = set()
_base_dirs: set[Path] = set()
def configure(file_items: list[dict], extra_base_dirs: list[str] | None = None):
"""
Build the allowlist from aggregate file_items.
Called by ai_client before each send so the list reflects the current project.
file_items : list of dicts from aggregate.build_file_items()
extra_base_dirs : additional directory roots to allow traversal of
"""
global _allowed_paths, _base_dirs
_allowed_paths = set()
_base_dirs = set()
for item in file_items:
p = item.get("path")
if p is not None:
rp = Path(p).resolve()
_allowed_paths.add(rp)
_base_dirs.add(rp.parent)
if extra_base_dirs:
for d in extra_base_dirs:
dp = Path(d).resolve()
if dp.is_dir():
_base_dirs.add(dp)
def _is_allowed(path: Path) -> bool:
"""
Return True if `path` is within the allowlist.
A path is allowed if:
- it is explicitly in _allowed_paths, OR
- it is contained within (or equal to) one of the _base_dirs
"""
rp = path.resolve()
if rp in _allowed_paths:
return True
for bd in _base_dirs:
try:
rp.relative_to(bd)
return True
except ValueError:
continue
return False
def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
"""
Resolve raw_path and verify it passes the allowlist check.
Returns (resolved_path, error_string). error_string is empty on success.
"""
try:
p = Path(raw_path).resolve()
except Exception as e:
return None, f"ERROR: invalid path '{raw_path}': {e}"
if not _is_allowed(p):
return None, (
f"ACCESS DENIED: '{raw_path}' is not within the allowed paths. "
f"Use list_directory or search_files on an allowed base directory first."
)
return p, ""
# ------------------------------------------------------------------ tool implementations
def read_file(path: str) -> str:
"""Return the UTF-8 content of a file, or an error string."""
p, err = _resolve_and_check(path)
if err:
return err
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
return p.read_text(encoding="utf-8")
except Exception as e:
return f"ERROR reading '{path}': {e}"
def list_directory(path: str) -> str:
"""List entries in a directory. Returns a compact text table."""
p, err = _resolve_and_check(path)
if err:
return err
if not p.exists():
return f"ERROR: path not found: {path}"
if not p.is_dir():
return f"ERROR: not a directory: {path}"
try:
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = [f"Directory: {p}", "" {
"name": "web_search",
"description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
},
{
"name": "fetch_url",
"description": "Fetch a webpage and extract its text content, removing HTML tags and scripts. Useful for reading documentation or articles found via web_search.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch."
}
},
"required": ["url"]
}
},
] for entry in entries:
kind = "file" if entry.is_file() else "dir "
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
lines.append(f" [{kind}] {entry.name:<40} {size}")
lines.append(f" ({len(entries)} entries)")
return "\n".join(lines)
except Exception as e:
return f"ERROR listing '{path}': {e}"
def search_files(path: str, pattern: str) -> str:
"""
Search for files matching a glob pattern within path.
pattern examples: '*.py', '**/*.toml', 'src/**/*.rs'
"""
p, err = _resolve_and_check(path)
if err:
return err
if not p.is_dir():
return f"ERROR: not a directory: {path}"
try:
matches = sorted(p.glob(pattern))
if not matches:
return f"No files matched '{pattern}' in {path}"
lines = [f"Search '{pattern}' in {p}:", "" {
"name": "web_search",
"description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
},
{
"name": "fetch_url",
"description": "Fetch a webpage and extract its text content, removing HTML tags and scripts. Useful for reading documentation or articles found via web_search.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch."
}
},
"required": ["url"]
}
},
] for m in matches:
rel = m.relative_to(p)
kind = "file" if m.is_file() else "dir "
lines.append(f" [{kind}] {rel}")
lines.append(f" ({len(matches)} match(es))")
return "\n".join(lines)
except Exception as e:
return f"ERROR searching '{path}': {e}"
def get_file_summary(path: str) -> str:
"""
Return the heuristic summary for a file (same as the initial context block).
For .py files: imports, classes, methods, functions, constants.
For .toml: table keys. For .md: headings. Others: line count + preview.
"""
p, err = _resolve_and_check(path)
if err:
return err
if not p.exists():
return f"ERROR: file not found: {path}"
if not p.is_file():
return f"ERROR: not a file: {path}"
try:
content = p.read_text(encoding="utf-8")
return summarize.summarise_file(p, content)
except Exception as e:
return f"ERROR summarising '{path}': {e}"
# ------------------------------------------------------------------ web tools
class _DDGParser(HTMLParser):
def __init__(self):
super().__init__()
self.results = [ {
"name": "web_search",
"description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
},
{
"name": "fetch_url",
"description": "Fetch a webpage and extract its text content, removing HTML tags and scripts. Useful for reading documentation or articles found via web_search.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch."
}
},
"required": ["url"]
}
},
] self.in_result = False
self.in_title = False
self.in_snippet = False
self.current_link = ""
self.current_title = ""
self.current_snippet = ""
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
if tag == "a" and "result__url" in attrs.get("class", ""):
self.current_link = attrs.get("href", "")
if tag == "a" and "result__snippet" in attrs.get("class", ""):
self.in_snippet = True
if tag == "h2" and "result__title" in attrs.get("class", ""):
self.in_title = True
def handle_endtag(self, tag):
if tag == "a" and self.in_snippet:
self.in_snippet = False
if tag == "h2" and self.in_title:
self.in_title = False
if self.current_link:
self.results.append({
"title": self.current_title.strip(),
"link": self.current_link,
"snippet": self.current_snippet.strip()
})
self.current_title = ""
self.current_snippet = ""
self.current_link = ""
def handle_data(self, data):
if self.in_title:
self.current_title += data
if self.in_snippet:
self.current_snippet += data
class _TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.text = [ {
"name": "web_search",
"description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
},
{
"name": "fetch_url",
"description": "Fetch a webpage and extract its text content, removing HTML tags and scripts. Useful for reading documentation or articles found via web_search.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch."
}
},
"required": ["url"]
}
},
] self.hide = 0
self.ignore_tags = {'script', 'style', 'head', 'meta', 'nav', 'header', 'footer', 'noscript', 'svg'}
def handle_starttag(self, tag, attrs):
if tag in self.ignore_tags:
self.hide += 1
def handle_endtag(self, tag):
if tag in self.ignore_tags:
self.hide -= 1
def handle_data(self, data):
if self.hide == 0:
cleaned = data.strip()
if cleaned:
self.text.append(cleaned)
def web_search(query: str) -> str:
"""Search the web using DuckDuckGo HTML and return top results."""
url = "https://html.duckduckgo.com/html/?q=" + urllib.parse.quote(query)
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
try:
html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore')
parser = _DDGParser()
parser.feed(html)
if not parser.results:
return f"No results found for '{query}'"
lines = [f"Search Results for '{query}':
" {
"name": "web_search",
"description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
},
{
"name": "fetch_url",
"description": "Fetch a webpage and extract its text content, removing HTML tags and scripts. Useful for reading documentation or articles found via web_search.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch."
}
},
"required": ["url"]
}
},
] for i, r in enumerate(parser.results[:5], 1):
lines.append(f"{i}. {r['title']}
URL: {r['link']}
Snippet: {r['snippet']}
")
return "
".join(lines)
except Exception as e:
return f"ERROR searching web for '{query}': {e}"
def fetch_url(url: str) -> str:
"""Fetch a URL and return its text content (stripped of HTML tags)."""
# Correct duckduckgo redirect links if passed
if url.startswith("//duckduckgo.com/l/?uddg="):
url = urllib.parse.unquote(url.split("uddg=")[1].split("&")[0])
if not url.startswith("http"):
url = "https://" + url
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
try:
html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore')
parser = _TextExtractor()
parser.feed(html)
full_text = " ".join(parser.text)
full_text = _re.sub(r'\s+', ' ', full_text)
# Limit to 40k chars to prevent context blowup
if len(full_text) > 40000:
return full_text[:40000] + "\n... (content truncated)"
return full_text
except Exception as e:
return f"ERROR fetching URL '{url}': {e}"
# ------------------------------------------------------------------ tool dispatch
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"}
def dispatch(tool_name: str, tool_input: dict) -> str:
"""
Dispatch an MCP tool call by name. Returns the result as a string.
"""
if tool_name == "read_file":
return read_file(tool_input.get("path", ""))
if tool_name == "list_directory":
return list_directory(tool_input.get("path", ""))
if tool_name == "search_files":
return search_files(tool_input.get("path", ""), tool_input.get("pattern", "*"))
if tool_name == "get_file_summary":
return get_file_summary(tool_input.get("path", ""))
if tool_name == "web_search":
return web_search(tool_input.get("query", ""))
if tool_name == "fetch_url":
return fetch_url(tool_input.get("url", ""))
return f"ERROR: unknown MCP tool '{tool_name}'"
# ------------------------------------------------------------------ tool schema helpers
# These are imported by ai_client.py to build provider-specific declarations.
MCP_TOOL_SPECS = [
{
"name": "read_file",
"description": (
"Read the full UTF-8 content of a file within the allowed project paths. "
"Use get_file_summary first to decide whether you need the full content."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file to read.",
}
},
"required": ["path"],
},
},
{
"name": "list_directory",
"description": (
"List files and subdirectories within an allowed directory. "
"Shows name, type (file/dir), and size. Use this to explore the project structure."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the directory to list.",
}
},
"required": ["path"],
},
},
{
"name": "search_files",
"description": (
"Search for files matching a glob pattern within an allowed directory. "
"Supports recursive patterns like '**/*.py'. "
"Use this to find files by extension or name pattern."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the directory to search within.",
},
"pattern": {
"type": "string",
"description": "Glob pattern, e.g. '*.py', '**/*.toml', 'src/**/*.rs'.",
},
},
"required": ["path", "pattern"],
},
},
{
"name": "get_file_summary",
"description": (
"Get a compact heuristic summary of a file without reading its full content. "
"For Python: imports, classes, methods, functions, constants. "
"For TOML: table keys. For Markdown: headings. Others: line count + preview. "
"Use this before read_file to decide if you need the full content."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file to summarise.",
}
},
"required": ["path"],
},
},
{
"name": "web_search",
"description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query."
}
},
"required": ["query"]
}
},
{
"name": "fetch_url",
"description": "Fetch a webpage and extract its text content, removing HTML tags and scripts. Useful for reading documentation or articles found via web_search.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch."
}
},
"required": ["url"]
}
},
]