276 lines
9.3 KiB
Python
276 lines
9.3 KiB
Python
# mcp_client.py
|
|
|
|
#MCP-style file context tools for manual_slop.
|
|
|
|
# Exposes read-only filesystem tools the AI can call to selectively fetch file
|
|
# content on demand, instead of having everything inlined into the context block.
|
|
# All access is restricted to paths that are either:
|
|
# - Explicitly listed in the project's allowed_paths set, OR
|
|
# - Contained within an allowed base_dir (must resolve to a subpath of it)
|
|
|
|
# Tools exposed:
|
|
# read_file(path) - return full UTF-8 content of a file
|
|
# list_directory(path) - list entries in a directory (names + type)
|
|
# search_files(path, pattern) - glob pattern search within an allowed dir
|
|
# get_file_summary(path) - return the summarize.py heuristic summary
|
|
#
|
|
|
|
from pathlib import Path
|
|
import summarize
|
|
|
|
# ------------------------------------------------------------------ state
|
|
|
|
# Set by configure() before the AI send loop starts.
|
|
# allowed_paths : set of resolved absolute Path objects (files or dirs)
|
|
# base_dirs : set of resolved absolute Path dirs that act as roots
|
|
_allowed_paths: set[Path] = set()
|
|
_base_dirs: set[Path] = set()
|
|
|
|
|
|
def configure(file_items: list[dict], extra_base_dirs: list[str] | None = None):
|
|
"""
|
|
Build the allowlist from aggregate file_items.
|
|
Called by ai_client before each send so the list reflects the current project.
|
|
|
|
file_items : list of dicts from aggregate.build_file_items()
|
|
extra_base_dirs : additional directory roots to allow traversal of
|
|
"""
|
|
global _allowed_paths, _base_dirs
|
|
_allowed_paths = set()
|
|
_base_dirs = set()
|
|
|
|
for item in file_items:
|
|
p = item.get("path")
|
|
if p is not None:
|
|
rp = Path(p).resolve()
|
|
_allowed_paths.add(rp)
|
|
_base_dirs.add(rp.parent)
|
|
|
|
if extra_base_dirs:
|
|
for d in extra_base_dirs:
|
|
dp = Path(d).resolve()
|
|
if dp.is_dir():
|
|
_base_dirs.add(dp)
|
|
|
|
|
|
def _is_allowed(path: Path) -> bool:
|
|
"""
|
|
Return True if `path` is within the allowlist.
|
|
A path is allowed if:
|
|
- it is explicitly in _allowed_paths, OR
|
|
- it is contained within (or equal to) one of the _base_dirs
|
|
"""
|
|
rp = path.resolve()
|
|
if rp in _allowed_paths:
|
|
return True
|
|
for bd in _base_dirs:
|
|
try:
|
|
rp.relative_to(bd)
|
|
return True
|
|
except ValueError:
|
|
continue
|
|
return False
|
|
|
|
|
|
def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
|
|
"""
|
|
Resolve raw_path and verify it passes the allowlist check.
|
|
Returns (resolved_path, error_string). error_string is empty on success.
|
|
"""
|
|
try:
|
|
p = Path(raw_path).resolve()
|
|
except Exception as e:
|
|
return None, f"ERROR: invalid path '{raw_path}': {e}"
|
|
if not _is_allowed(p):
|
|
return None, (
|
|
f"ACCESS DENIED: '{raw_path}' is not within the allowed paths. "
|
|
f"Use list_directory or search_files on an allowed base directory first."
|
|
)
|
|
return p, ""
|
|
|
|
|
|
# ------------------------------------------------------------------ tool implementations
|
|
|
|
def read_file(path: str) -> str:
|
|
"""Return the UTF-8 content of a file, or an error string."""
|
|
p, err = _resolve_and_check(path)
|
|
if err:
|
|
return err
|
|
if not p.exists():
|
|
return f"ERROR: file not found: {path}"
|
|
if not p.is_file():
|
|
return f"ERROR: not a file: {path}"
|
|
try:
|
|
return p.read_text(encoding="utf-8")
|
|
except Exception as e:
|
|
return f"ERROR reading '{path}': {e}"
|
|
|
|
|
|
def list_directory(path: str) -> str:
|
|
"""List entries in a directory. Returns a compact text table."""
|
|
p, err = _resolve_and_check(path)
|
|
if err:
|
|
return err
|
|
if not p.exists():
|
|
return f"ERROR: path not found: {path}"
|
|
if not p.is_dir():
|
|
return f"ERROR: not a directory: {path}"
|
|
try:
|
|
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
|
|
lines = [f"Directory: {p}", ""]
|
|
for entry in entries:
|
|
kind = "file" if entry.is_file() else "dir "
|
|
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
|
|
lines.append(f" [{kind}] {entry.name:<40} {size}")
|
|
lines.append(f" ({len(entries)} entries)")
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
return f"ERROR listing '{path}': {e}"
|
|
|
|
|
|
def search_files(path: str, pattern: str) -> str:
|
|
"""
|
|
Search for files matching a glob pattern within path.
|
|
pattern examples: '*.py', '**/*.toml', 'src/**/*.rs'
|
|
"""
|
|
p, err = _resolve_and_check(path)
|
|
if err:
|
|
return err
|
|
if not p.is_dir():
|
|
return f"ERROR: not a directory: {path}"
|
|
try:
|
|
matches = sorted(p.glob(pattern))
|
|
if not matches:
|
|
return f"No files matched '{pattern}' in {path}"
|
|
lines = [f"Search '{pattern}' in {p}:", ""]
|
|
for m in matches:
|
|
rel = m.relative_to(p)
|
|
kind = "file" if m.is_file() else "dir "
|
|
lines.append(f" [{kind}] {rel}")
|
|
lines.append(f" ({len(matches)} match(es))")
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
return f"ERROR searching '{path}': {e}"
|
|
|
|
|
|
def get_file_summary(path: str) -> str:
|
|
"""
|
|
Return the heuristic summary for a file (same as the initial context block).
|
|
For .py files: imports, classes, methods, functions, constants.
|
|
For .toml: table keys. For .md: headings. Others: line count + preview.
|
|
"""
|
|
p, err = _resolve_and_check(path)
|
|
if err:
|
|
return err
|
|
if not p.exists():
|
|
return f"ERROR: file not found: {path}"
|
|
if not p.is_file():
|
|
return f"ERROR: not a file: {path}"
|
|
try:
|
|
content = p.read_text(encoding="utf-8")
|
|
return summarize.summarise_file(p, content)
|
|
except Exception as e:
|
|
return f"ERROR summarising '{path}': {e}"
|
|
|
|
|
|
# ------------------------------------------------------------------ tool dispatch
|
|
|
|
TOOL_NAMES = {"read_file", "list_directory", "search_files", "get_file_summary"}
|
|
|
|
|
|
def dispatch(tool_name: str, tool_input: dict) -> str:
|
|
"""
|
|
Dispatch an MCP tool call by name. Returns the result as a string.
|
|
"""
|
|
if tool_name == "read_file":
|
|
return read_file(tool_input.get("path", ""))
|
|
if tool_name == "list_directory":
|
|
return list_directory(tool_input.get("path", ""))
|
|
if tool_name == "search_files":
|
|
return search_files(tool_input.get("path", ""), tool_input.get("pattern", "*"))
|
|
if tool_name == "get_file_summary":
|
|
return get_file_summary(tool_input.get("path", ""))
|
|
return f"ERROR: unknown MCP tool '{tool_name}'"
|
|
|
|
|
|
# ------------------------------------------------------------------ tool schema helpers
|
|
# These are imported by ai_client.py to build provider-specific declarations.
|
|
|
|
MCP_TOOL_SPECS = [
|
|
{
|
|
"name": "read_file",
|
|
"description": (
|
|
"Read the full UTF-8 content of a file within the allowed project paths. "
|
|
"Use get_file_summary first to decide whether you need the full content."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Absolute or relative path to the file to read.",
|
|
}
|
|
},
|
|
"required": ["path"],
|
|
},
|
|
},
|
|
{
|
|
"name": "list_directory",
|
|
"description": (
|
|
"List files and subdirectories within an allowed directory. "
|
|
"Shows name, type (file/dir), and size. Use this to explore the project structure."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Absolute path to the directory to list.",
|
|
}
|
|
},
|
|
"required": ["path"],
|
|
},
|
|
},
|
|
{
|
|
"name": "search_files",
|
|
"description": (
|
|
"Search for files matching a glob pattern within an allowed directory. "
|
|
"Supports recursive patterns like '**/*.py'. "
|
|
"Use this to find files by extension or name pattern."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Absolute path to the directory to search within.",
|
|
},
|
|
"pattern": {
|
|
"type": "string",
|
|
"description": "Glob pattern, e.g. '*.py', '**/*.toml', 'src/**/*.rs'.",
|
|
},
|
|
},
|
|
"required": ["path", "pattern"],
|
|
},
|
|
},
|
|
{
|
|
"name": "get_file_summary",
|
|
"description": (
|
|
"Get a compact heuristic summary of a file without reading its full content. "
|
|
"For Python: imports, classes, methods, functions, constants. "
|
|
"For TOML: table keys. For Markdown: headings. Others: line count + preview. "
|
|
"Use this before read_file to decide if you need the full content."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"path": {
|
|
"type": "string",
|
|
"description": "Absolute or relative path to the file to summarise.",
|
|
}
|
|
},
|
|
"required": ["path"],
|
|
},
|
|
},
|
|
]
|