432 lines
15 KiB
Python
432 lines
15 KiB
Python
# mcp_client.py
|
||
"""
|
||
Note(Gemini):
|
||
MCP-style file context tools for manual_slop.
|
||
Exposes read-only filesystem tools the AI can call to selectively fetch file
|
||
content on demand, instead of having everything inlined into the context block.
|
||
|
||
All access is restricted to paths that are either:
|
||
- Explicitly listed in the project's allowed_paths set, OR
|
||
- Contained within an allowed base_dir (must resolve to a subpath of it)
|
||
|
||
This is heavily inspired by Claude's own tooling limits. We enforce safety here
|
||
so the AI doesn't wander outside the project workspace.
|
||
"""
|
||
# mcp_client.py
|
||
|
||
#MCP-style file context tools for manual_slop.
|
||
|
||
# Exposes read-only filesystem tools the AI can call to selectively fetch file
|
||
# content on demand, instead of having everything inlined into the context block.
|
||
# All access is restricted to paths that are either:
|
||
# - Explicitly listed in the project's allowed_paths set, OR
|
||
# - Contained within an allowed base_dir (must resolve to a subpath of it)
|
||
|
||
# Tools exposed:
|
||
# read_file(path) - return full UTF-8 content of a file
|
||
# list_directory(path) - list entries in a directory (names + type)
|
||
# search_files(path, pattern) - glob pattern search within an allowed dir
|
||
# get_file_summary(path) - return the summarize.py heuristic summary
|
||
#
|
||
|
||
from pathlib import Path
|
||
import summarize
|
||
import urllib.request
|
||
import urllib.parse
|
||
from html.parser import HTMLParser
|
||
import re as _re
|
||
|
||
# ------------------------------------------------------------------ state
|
||
|
||
# Set by configure() before the AI send loop starts.
|
||
# allowed_paths : set of resolved absolute Path objects (files or dirs)
|
||
# base_dirs : set of resolved absolute Path dirs that act as roots
|
||
_allowed_paths: set[Path] = set()
|
||
_base_dirs: set[Path] = set()
|
||
|
||
|
||
def configure(file_items: list[dict], extra_base_dirs: list[str] | None = None):
|
||
"""
|
||
Build the allowlist from aggregate file_items.
|
||
Called by ai_client before each send so the list reflects the current project.
|
||
|
||
file_items : list of dicts from aggregate.build_file_items()
|
||
extra_base_dirs : additional directory roots to allow traversal of
|
||
"""
|
||
global _allowed_paths, _base_dirs
|
||
_allowed_paths = set()
|
||
_base_dirs = set()
|
||
|
||
for item in file_items:
|
||
p = item.get("path")
|
||
if p is not None:
|
||
rp = Path(p).resolve()
|
||
_allowed_paths.add(rp)
|
||
_base_dirs.add(rp.parent)
|
||
|
||
if extra_base_dirs:
|
||
for d in extra_base_dirs:
|
||
dp = Path(d).resolve()
|
||
if dp.is_dir():
|
||
_base_dirs.add(dp)
|
||
|
||
|
||
def _is_allowed(path: Path) -> bool:
|
||
"""
|
||
Return True if `path` is within the allowlist.
|
||
A path is allowed if:
|
||
- it is explicitly in _allowed_paths, OR
|
||
- it is contained within (or equal to) one of the _base_dirs
|
||
"""
|
||
rp = path.resolve()
|
||
if rp in _allowed_paths:
|
||
return True
|
||
for bd in _base_dirs:
|
||
try:
|
||
rp.relative_to(bd)
|
||
return True
|
||
except ValueError:
|
||
continue
|
||
return False
|
||
|
||
|
||
def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
|
||
"""
|
||
Resolve raw_path and verify it passes the allowlist check.
|
||
Returns (resolved_path, error_string). error_string is empty on success.
|
||
"""
|
||
try:
|
||
p = Path(raw_path).resolve()
|
||
except Exception as e:
|
||
return None, f"ERROR: invalid path '{raw_path}': {e}"
|
||
if not _is_allowed(p):
|
||
return None, (
|
||
f"ACCESS DENIED: '{raw_path}' is not within the allowed paths. "
|
||
f"Use list_directory or search_files on an allowed base directory first."
|
||
)
|
||
return p, ""
|
||
|
||
|
||
# ------------------------------------------------------------------ tool implementations
|
||
|
||
def read_file(path: str) -> str:
|
||
"""Return the UTF-8 content of a file, or an error string."""
|
||
p, err = _resolve_and_check(path)
|
||
if err:
|
||
return err
|
||
if not p.exists():
|
||
return f"ERROR: file not found: {path}"
|
||
if not p.is_file():
|
||
return f"ERROR: not a file: {path}"
|
||
try:
|
||
return p.read_text(encoding="utf-8")
|
||
except Exception as e:
|
||
return f"ERROR reading '{path}': {e}"
|
||
|
||
|
||
def list_directory(path: str) -> str:
|
||
"""List entries in a directory. Returns a compact text table."""
|
||
p, err = _resolve_and_check(path)
|
||
if err:
|
||
return err
|
||
if not p.exists():
|
||
return f"ERROR: path not found: {path}"
|
||
if not p.is_dir():
|
||
return f"ERROR: not a directory: {path}"
|
||
try:
|
||
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
|
||
lines = [f"Directory: {p}", ""]
|
||
for entry in entries:
|
||
kind = "file" if entry.is_file() else "dir "
|
||
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
|
||
lines.append(f" [{kind}] {entry.name:<40} {size}")
|
||
lines.append(f" ({len(entries)} entries)")
|
||
return "\n".join(lines)
|
||
except Exception as e:
|
||
return f"ERROR listing '{path}': {e}"
|
||
|
||
|
||
def search_files(path: str, pattern: str) -> str:
|
||
"""
|
||
Search for files matching a glob pattern within path.
|
||
pattern examples: '*.py', '**/*.toml', 'src/**/*.rs'
|
||
"""
|
||
p, err = _resolve_and_check(path)
|
||
if err:
|
||
return err
|
||
if not p.is_dir():
|
||
return f"ERROR: not a directory: {path}"
|
||
try:
|
||
matches = sorted(p.glob(pattern))
|
||
if not matches:
|
||
return f"No files matched '{pattern}' in {path}"
|
||
lines = [f"Search '{pattern}' in {p}:", ""]
|
||
for m in matches:
|
||
rel = m.relative_to(p)
|
||
kind = "file" if m.is_file() else "dir "
|
||
lines.append(f" [{kind}] {rel}")
|
||
lines.append(f" ({len(matches)} match(es))")
|
||
return "\n".join(lines)
|
||
except Exception as e:
|
||
return f"ERROR searching '{path}': {e}"
|
||
|
||
|
||
def get_file_summary(path: str) -> str:
|
||
"""
|
||
Return the heuristic summary for a file (same as the initial context block).
|
||
For .py files: imports, classes, methods, functions, constants.
|
||
For .toml: table keys. For .md: headings. Others: line count + preview.
|
||
"""
|
||
p, err = _resolve_and_check(path)
|
||
if err:
|
||
return err
|
||
if not p.exists():
|
||
return f"ERROR: file not found: {path}"
|
||
if not p.is_file():
|
||
return f"ERROR: not a file: {path}"
|
||
try:
|
||
content = p.read_text(encoding="utf-8")
|
||
return summarize.summarise_file(p, content)
|
||
except Exception as e:
|
||
return f"ERROR summarising '{path}': {e}"
|
||
|
||
|
||
|
||
# ------------------------------------------------------------------ web tools
|
||
|
||
class _DDGParser(HTMLParser):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.results = []
|
||
self.in_result = False
|
||
self.in_title = False
|
||
self.in_snippet = False
|
||
self.current_link = ""
|
||
self.current_title = ""
|
||
self.current_snippet = ""
|
||
|
||
def handle_starttag(self, tag, attrs):
|
||
attrs = dict(attrs)
|
||
if tag == "a" and "result__url" in attrs.get("class", ""):
|
||
self.current_link = attrs.get("href", "")
|
||
if tag == "a" and "result__snippet" in attrs.get("class", ""):
|
||
self.in_snippet = True
|
||
if tag == "h2" and "result__title" in attrs.get("class", ""):
|
||
self.in_title = True
|
||
|
||
def handle_endtag(self, tag):
|
||
if tag == "a" and self.in_snippet:
|
||
self.in_snippet = False
|
||
if tag == "h2" and self.in_title:
|
||
self.in_title = False
|
||
if self.current_link:
|
||
self.results.append({
|
||
"title": self.current_title.strip(),
|
||
"link": self.current_link,
|
||
"snippet": self.current_snippet.strip()
|
||
})
|
||
self.current_title = ""
|
||
self.current_snippet = ""
|
||
self.current_link = ""
|
||
|
||
def handle_data(self, data):
|
||
if self.in_title:
|
||
self.current_title += data
|
||
if self.in_snippet:
|
||
self.current_snippet += data
|
||
|
||
class _TextExtractor(HTMLParser):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.text = []
|
||
self.hide = 0
|
||
self.ignore_tags = {'script', 'style', 'head', 'meta', 'nav', 'header', 'footer', 'noscript', 'svg'}
|
||
|
||
def handle_starttag(self, tag, attrs):
|
||
if tag in self.ignore_tags:
|
||
self.hide += 1
|
||
|
||
def handle_endtag(self, tag):
|
||
if tag in self.ignore_tags:
|
||
self.hide -= 1
|
||
|
||
def handle_data(self, data):
|
||
if self.hide == 0:
|
||
cleaned = data.strip()
|
||
if cleaned:
|
||
self.text.append(cleaned)
|
||
|
||
def web_search(query: str) -> str:
|
||
"""Search the web using DuckDuckGo HTML and return top results."""
|
||
url = "https://html.duckduckgo.com/html/?q=" + urllib.parse.quote(query)
|
||
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
|
||
try:
|
||
html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore')
|
||
parser = _DDGParser()
|
||
parser.feed(html)
|
||
if not parser.results:
|
||
return f"No results found for '{query}'"
|
||
lines = [f"Search Results for '{query}':"]
|
||
for i, r in enumerate(parser.results[:5], 1):
|
||
lines.append(f"{i}. {r['title']}\nURL: {r['link']}\nSnippet: {r['snippet']}\n")
|
||
return "\n".join(lines)
|
||
except Exception as e:
|
||
return f"ERROR searching web for '{query}': {e}"
|
||
|
||
def fetch_url(url: str) -> str:
|
||
"""Fetch a URL and return its text content (stripped of HTML tags)."""
|
||
# Correct duckduckgo redirect links if passed
|
||
if url.startswith("//duckduckgo.com/l/?uddg="):
|
||
url = urllib.parse.unquote(url.split("uddg=")[1].split("&")[0])
|
||
|
||
if not url.startswith("http"):
|
||
url = "https://" + url
|
||
|
||
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
|
||
try:
|
||
html = urllib.request.urlopen(req, timeout=10).read().decode('utf-8', errors='ignore')
|
||
parser = _TextExtractor()
|
||
parser.feed(html)
|
||
full_text = " ".join(parser.text)
|
||
full_text = _re.sub(r'\s+', ' ', full_text)
|
||
# Limit to 40k chars to prevent context blowup
|
||
if len(full_text) > 40000:
|
||
return full_text[:40000] + "\n... (content truncated)"
|
||
return full_text
|
||
except Exception as e:
|
||
return f"ERROR fetching URL '{url}': {e}"
|
||
|
||
# ------------------------------------------------------------------ tool dispatch
|
||
|
||
|
||
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"}
|
||
|
||
|
||
def dispatch(tool_name: str, tool_input: dict) -> str:
|
||
"""
|
||
Dispatch an MCP tool call by name. Returns the result as a string.
|
||
"""
|
||
if tool_name == "read_file":
|
||
return read_file(tool_input.get("path", ""))
|
||
if tool_name == "list_directory":
|
||
return list_directory(tool_input.get("path", ""))
|
||
if tool_name == "search_files":
|
||
return search_files(tool_input.get("path", ""), tool_input.get("pattern", "*"))
|
||
if tool_name == "get_file_summary":
|
||
return get_file_summary(tool_input.get("path", ""))
|
||
if tool_name == "web_search":
|
||
return web_search(tool_input.get("query", ""))
|
||
if tool_name == "fetch_url":
|
||
return fetch_url(tool_input.get("url", ""))
|
||
return f"ERROR: unknown MCP tool '{tool_name}'"
|
||
|
||
|
||
# ------------------------------------------------------------------ tool schema helpers
|
||
# These are imported by ai_client.py to build provider-specific declarations.
|
||
|
||
MCP_TOOL_SPECS = [
|
||
{
|
||
"name": "read_file",
|
||
"description": (
|
||
"Read the full UTF-8 content of a file within the allowed project paths. "
|
||
"Use get_file_summary first to decide whether you need the full content."
|
||
),
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Absolute or relative path to the file to read.",
|
||
}
|
||
},
|
||
"required": ["path"],
|
||
},
|
||
},
|
||
{
|
||
"name": "list_directory",
|
||
"description": (
|
||
"List files and subdirectories within an allowed directory. "
|
||
"Shows name, type (file/dir), and size. Use this to explore the project structure."
|
||
),
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Absolute path to the directory to list.",
|
||
}
|
||
},
|
||
"required": ["path"],
|
||
},
|
||
},
|
||
{
|
||
"name": "search_files",
|
||
"description": (
|
||
"Search for files matching a glob pattern within an allowed directory. "
|
||
"Supports recursive patterns like '**/*.py'. "
|
||
"Use this to find files by extension or name pattern."
|
||
),
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Absolute path to the directory to search within.",
|
||
},
|
||
"pattern": {
|
||
"type": "string",
|
||
"description": "Glob pattern, e.g. '*.py', '**/*.toml', 'src/**/*.rs'.",
|
||
},
|
||
},
|
||
"required": ["path", "pattern"],
|
||
},
|
||
},
|
||
{
|
||
"name": "get_file_summary",
|
||
"description": (
|
||
"Get a compact heuristic summary of a file without reading its full content. "
|
||
"For Python: imports, classes, methods, functions, constants. "
|
||
"For TOML: table keys. For Markdown: headings. Others: line count + preview. "
|
||
"Use this before read_file to decide if you need the full content."
|
||
),
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {
|
||
"type": "string",
|
||
"description": "Absolute or relative path to the file to summarise.",
|
||
}
|
||
},
|
||
"required": ["path"],
|
||
},
|
||
},
|
||
{
|
||
"name": "web_search",
|
||
"description": "Search the web using DuckDuckGo. Returns the top 5 search results with titles, URLs, and snippets. Chain this with fetch_url to read specific pages.",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "The search query."
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
},
|
||
{
|
||
"name": "fetch_url",
|
||
"description": "Fetch a webpage and extract its text content, removing HTML tags and scripts. Useful for reading documentation or articles found via web_search.",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"url": {
|
||
"type": "string",
|
||
"description": "The URL to fetch."
|
||
}
|
||
},
|
||
"required": ["url"]
|
||
}
|
||
},
|
||
]
|