Private
Public Access
0
0
Files
manual_slop/src/mcp_client.py
T
ed b3aeaa4376 fix(post_de_cruft_iter2): fix 3 pre-existing test failures + lazy tomli_w imports
1. tier-1-unit-core::test_audit_script_exits_zero
   - audit_main_thread_imports.py failed with 3 heavy top-level imports
   - Made tomli_w lazy in src/personas.py, src/tool_presets.py, src/workspace_manager.py
   - Made 'from scripts import py_struct_tools' lazy inside src/mcp_client.py:dispatch()
   - Audit now exits 0 (28 files in main-thread import graph, no heavy top-level imports)

2. tier-2-mock-app-headless::test_status_endpoint_authorized
   - /status endpoint goes through _api_status() which returns controller.ai_status (default 'idle'),
     not the literal 'ok' string the test expected
   - Updated test to expect 'idle' (the actual ai_status default for a fresh controller)

3. tier-3-live_gui::test_auto_switch_sim
   - _capture_workspace_profile() in src/gui_2.py referenced 'WorkspaceProfile' as a bare name,
     but the module had only 'from src import workspace_manager' (the module, not the class)
   - Added 'from src.workspace_manager import WorkspaceProfile' to fix the NameError
   - Profile save/load round-trip now works; auto-switch fires Tier 3 bound profile

Additional test fixes (uncovered by full run):
- tests/test_cruft_removal.py: patch 'src.mcp_client.py_struct_tools' no longer works
  (lazy import means the attribute doesn't exist). Patched 'scripts.py_struct_tools.py_remove_def'
  and '.py_move_def' directly at the source module.
- tests/test_command_palette_sim.py: 'from src.command_palette' was deleted in
  module_taxonomy_refactor; updated to 'from src.commands' (which now hosts _close_palette,
  _execute, and Command after the merge).

Production fix:
- src/presets.py:save_preset now raises ValueError when scope='project' but
  project_root is None (fail-fast per error_handling.md, prevents silent
  write to '.').

Type registry regenerated to reflect new line numbers.
2026-06-27 10:17:51 -04:00

2104 lines
88 KiB
Python

# mcp_client.py
"""
MCP Client - Multi-tool filesystem and network operations with sandboxing.
This module implements a Model Context Protocol (MCP)-like interface for AI
agents to interact with the filesystem and network. It provides 26 tools
with a three-layer security model to prevent unauthorized access.
Three-Layer Security Model:
1. Allowlist Construction (configure()):
- Builds _allowed_paths from project file_items
- Populates _base_dirs from file parents and extra_base_dirs
- Sets _primary_base_dir for relative path resolution
2. Path Validation (_is_allowed()):
- Blacklist check: history.toml, *_history.toml, config, credentials
- Explicit allowlist check: _allowed_paths membership
- CWD fallback: allows cwd() subpaths if no base_dirs configured
- Base directory containment: must be subpath of _base_dirs
3. Resolution Gate (_resolve_and_check()):
- Converts relative paths using _primary_base_dir
- Resolves symlinks to prevent traversal attacks
- Returns (resolved_path, error_message) tuple
Tool Categories:
- File I/O: read_file, list_directory, search_files, get_tree
- Surgical Edits: set_file_slice, edit_file
- AST-Based (Python): py_get_skeleton, py_get_code_outline, py_get_definition,
py_update_definition, py_get_signature, py_set_signature, py_get_class_summary,
py_get_var_declaration, py_set_var_declaration
- Analysis: get_file_summary, get_git_diff, py_find_usages, py_get_imports,
py_check_syntax, py_get_hierarchy, py_get_docstring, derive_code_path
- Network: web_search, fetch_url
- Runtime: get_ui_performance
Mutating Tools:
The MUTATING_TOOLS frozenset defines tools that modify files. ai_client.py
checks this set and routes to pre_tool_callback (GUI approval) if present.
Thread Safety:
This module uses module-level global state (_allowed_paths, _base_dirs).
Call configure() before dispatch() in multi-threaded environments.
See Also:
- docs/guide_tools.md for complete tool inventory and security model
- src/ai_client.py for tool dispatch integration
- src/shell_runner.py for PowerShell execution
"""
# mcp_client.py
#MCP-style file context tools for manual_slop.
from __future__ import annotations
import ast
import asyncio
import json
import os
import re as _re
import subprocess
import urllib.request
import urllib.parse
from dataclasses import dataclass, field
from html.parser import HTMLParser
from pathlib import Path
from typing import Dict, List, Optional, Callable, Any, cast
from src import beads_client
from src import outline_tool
from src import summarize
from src import mcp_tool_specs
from src.result_types import ErrorInfo, ErrorKind, NilPath, Result
from src.type_aliases import Metadata
# --------------------------------------------------------------------------- MCP config dataclasses
# Moved from src/models.py in module_taxonomy_refactor_20260627 Phase 3i.
# These are the data layer of the MCP subsystem; they belong here.
@dataclass
class MCPServerConfig:
name: str
command: Optional[str] = None
args: List[str] = field(default_factory=list)
url: Optional[str] = None
auto_start: bool = False
def to_dict(self) -> Metadata:
res = {'auto_start': self.auto_start}
if self.command: res['command'] = self.command
if self.args: res['args'] = self.args
if self.url: res['url'] = self.url
return res
@classmethod
def from_dict(cls, name: str, data: Metadata) -> "MCPServerConfig":
return cls(
name = name,
command = data.get('command'),
args = data.get('args', []),
url = data.get('url'),
auto_start = data.get('auto_start', False),
)
@dataclass
class MCPConfiguration:
mcpServers: Dict[str, MCPServerConfig] = field(default_factory=dict)
def to_dict(self) -> Metadata:
return {'mcpServers': {name: cfg.to_dict() for name, cfg in self.mcpServers.items()}}
@classmethod
def from_dict(cls, data: Metadata) -> "MCPConfiguration":
raw_servers = data.get('mcpServers', {})
parsed_servers = {name: MCPServerConfig.from_dict(name, cfg) for name, cfg in raw_servers.items()}
return cls(mcpServers=parsed_servers)
@dataclass
class VectorStoreConfig:
provider: str
url: Optional[str] = None
api_key: Optional[str] = None
collection_name: str = 'manual_slop'
mcp_server: Optional[str] = None
mcp_tool: Optional[str] = None
def to_dict(self) -> Metadata:
return {
"provider": self.provider,
"url": self.url,
"api_key": self.api_key,
"collection_name": self.collection_name,
"mcp_server": self.mcp_server,
"mcp_tool": self.mcp_tool,
}
@classmethod
def from_dict(cls, data: Metadata) -> "VectorStoreConfig":
return cls(
provider = data["provider"],
url = data.get("url"),
api_key = data.get("api_key"),
collection_name = data.get("collection_name", "manual_slop"),
mcp_server = data.get("mcp_server"),
mcp_tool = data.get("mcp_tool"),
)
@dataclass
class RAGConfig:
enabled: bool = False
vector_store: VectorStoreConfig = field(default_factory=lambda: VectorStoreConfig(provider='mock'))
embedding_provider: str = 'gemini'
chunk_size: int = 1000
chunk_overlap: int = 200
def to_dict(self) -> Metadata:
return {
"enabled": self.enabled,
"vector_store": self.vector_store.to_dict(),
"embedding_provider": self.embedding_provider,
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
}
@classmethod
def from_dict(cls, data: Metadata) -> "RAGConfig":
return cls(
enabled = data.get("enabled", False),
vector_store = VectorStoreConfig.from_dict(data.get("vector_store", {"provider": "mock"})),
embedding_provider = data.get("embedding_provider", "gemini"),
chunk_size = data.get("chunk_size", 1000),
chunk_overlap = data.get("chunk_overlap", 200),
)
def load_mcp_config(path: str) -> MCPConfiguration:
if not os.path.exists(path):
return MCPConfiguration()
with open(path, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
return MCPConfiguration.from_dict(data)
except (OSError, json.JSONDecodeError, UnicodeDecodeError) as e:
_mcp_err = Result(data=MCPConfiguration(), errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"failed to load MCP config: {e}", source="mcp_client.load_mcp_config", original=e)])
return _mcp_err.data
# ------------------------------------------------------------------ mutating tools sentinel
# Tools that write or modify files. ai_client checks this set before dispatch
# and routes to pre_tool_callback (GUI approval) if the tool name is present.
MUTATING_TOOLS: frozenset[str] = frozenset({
"set_file_slice",
"py_update_definition",
"py_set_signature",
"py_set_var_declaration",
"edit_file",
"ts_c_update_definition",
"ts_cpp_update_definition",
"py_remove_def",
"py_add_def",
"py_move_def",
"py_region_wrap",
})
# ------------------------------------------------------------------ state
# Set by configure() before the AI send loop starts.
# allowed_paths : set of resolved absolute Path objects (files or dirs)
# base_dirs : set of resolved absolute Path dirs that act as roots
_allowed_paths: set[Path] = set()
_base_dirs: set[Path] = set()
_primary_base_dir: Path | None = None
# Injected by gui_2.py - returns a dict of performance metrics
perf_monitor_callback: Optional[Callable[[], dict[str, Any]]] = None
def configure(file_items: list[dict[str, Any]], extra_base_dirs: list[str] | None = None) -> None:
"""
Build the allowlist from aggregate file_items.
Called by ai_client before each send so the list reflects the current project.
file_items : list of dicts from aggregate.build_file_items()
extra_base_dirs : additional directory roots to allow traversal of
[C: tests/conftest.py:reset_ai_client, tests/test_arch_boundary_phase1.py:TestArchBoundaryPhase1.test_mcp_client_whitelist_enforcement, tests/test_mcp_client_beads.py:test_bd_mcp_tools, tests/test_py_struct_tools.py:test_mcp_dispatch_errors, tests/test_py_struct_tools.py:test_mcp_dispatch_integration]
"""
global _allowed_paths, _base_dirs, _primary_base_dir
_allowed_paths = set()
_base_dirs = set()
_primary_base_dir = Path(extra_base_dirs[0]).resolve() if extra_base_dirs else Path.cwd()
for item in file_items:
p = item.get("path")
if p is not None:
try:
rp = Path(p).resolve(strict=True)
except (OSError, ValueError):
rp = Path(p).resolve()
_allowed_paths.add(rp)
_base_dirs.add(rp.parent)
if extra_base_dirs:
for d in extra_base_dirs:
dp = Path(d).resolve()
if dp.is_dir():
_base_dirs.add(dp)
def _is_allowed(path: Path) -> bool:
"""
Return True if `path` is within the allowlist.
A path is allowed if:
- it is explicitly in _allowed_paths, OR
- it is contained within (or equal to) one of the _base_dirs
All paths are resolved (follows symlinks) before comparison to prevent
symlink-based path traversal.
CRITICAL: Blacklisted files (history) are NEVER allowed.
[C: tests/test_arch_boundary_phase1.py:TestArchBoundaryPhase1.test_mcp_client_whitelist_enforcement, tests/test_history_management.py:test_mcp_blacklist]
"""
from src.paths import get_config_path, get_credentials_path
try:
rp = path.resolve(strict=True)
except (OSError, ValueError):
rp = path.resolve()
# Blacklist check by resolved path
if rp == get_config_path().resolve(): return False
if rp == get_credentials_path().resolve(): return False
name = path.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
return False
if rp in _allowed_paths:
return True
# Allow current working directory and subpaths by default if no base_dirs
cwd = Path.cwd().resolve()
if not _base_dirs:
if rp.is_relative_to(cwd):
return True
for bd in _base_dirs:
if rp.is_relative_to(bd):
return True
return False
# ------------------------------------------------------------------ tool implementations
def search_files(path: str, pattern: str) -> str:
"""
Search for files matching a glob pattern within path.
pattern examples: '*.py', '**/*.toml', 'src/**/*.rs'
Thin wrapper over search_files_result; the legacy str shape is preserved
for backward compatibility, but the try/except Exception lives in the
Result variant (where it can capture ErrorInfo with kind=INTERNAL).
"""
resolved = search_files_result(path, pattern)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def list_directory(path: str) -> str:
"""List entries in a directory. Returns a compact text table.
Thin wrapper over list_directory_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = list_directory_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def read_file(path: str) -> str:
"""Return the UTF-8 content of a file, or an error string.
Thin wrapper over read_file_result; the legacy str shape is preserved
for backward compatibility, but the try/except Exception lives in
the Result variant.
"""
resolved = read_file_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
#region: Result Variants
def _resolve_and_check_result(raw_path: str) -> Result[Path]:
try:
p = Path(raw_path)
if not p.is_absolute() and _primary_base_dir:
p = _primary_base_dir / p
p = p.resolve()
except Exception as e:
return Result(
data=NilPath(),
errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=str(e), source="mcp._resolve_and_check_result", original=e)],
)
if not _is_allowed(p):
allowed_bases = "\n".join([f" - {d}" for d in _base_dirs])
return Result(
data=NilPath(),
errors=[ErrorInfo(
kind=ErrorKind.PERMISSION,
message=f"path '{raw_path}' resolves to '{p}', which is not within the allowed paths.\nAllowed base directories are:\n{allowed_bases}",
source="mcp._resolve_and_check_result",
)],
)
return Result(data=p)
def read_file_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.read_file_result")])
if not p.is_file():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a file: {path}", source="mcp.read_file_result")])
try:
content = p.read_text(encoding="utf-8")
return Result(data=content)
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.read_file_result", original=e)])
def list_directory_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"path not found: {path}", source="mcp.list_directory_result")])
if not p.is_dir():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a directory: {path}", source="mcp.list_directory_result")])
try:
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = [f"Directory: {p}", ""]
count = 0
for entry in entries:
name = entry.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
kind = "file" if entry.is_file() else "dir "
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
lines.append(f" [{kind}] {entry.name:<40} {size}")
count += 1
lines.append(f" ({count} entries)")
return Result(data="\n".join(lines))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.list_directory_result", original=e)])
def search_files_result(path: str, pattern: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.is_dir():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a directory: {path}", source="mcp.search_files_result")])
try:
matches = sorted(p.glob(pattern))
if not matches:
return Result(data=f"No files matched '{pattern}' in {path}")
lines = [f"Search '{pattern}' in {p}:", ""]
count = 0
for m in matches:
name = m.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
rel = m.relative_to(p)
kind = "file" if m.is_file() else "dir "
lines.append(f" [{kind}] {rel}")
count += 1
lines.append(f" ({count} match(es))")
return Result(data="\n".join(lines))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.search_files_result", original=e)])
def edit_file_result(path: str, old_string: str, new_string: str, replace_all: bool = False) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.edit_file_result")])
if not old_string:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message="old_string cannot be empty", source="mcp.edit_file_result")])
try:
content = p.read_text(encoding="utf-8")
if old_string not in content:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"old_string not found in '{path}'", source="mcp.edit_file_result")])
count = content.count(old_string)
if count > 1 and not replace_all:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"Found {count} matches for old_string in '{path}'. Use replace_all=true or provide more context to make it unique.", source="mcp.edit_file_result")])
if replace_all:
new_content = content.replace(old_string, new_string)
p.write_text(new_content, encoding="utf-8")
return Result(data=f"Successfully replaced {count} occurrences in '{path}'")
new_content = content.replace(old_string, new_string, 1)
p.write_text(new_content, encoding="utf-8")
return Result(data=f"Successfully replaced 1 occurrence in '{path}'")
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.edit_file_result", original=e)])
def get_file_summary_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.get_file_summary_result")])
if not p.is_file():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a file: {path}", source="mcp.get_file_summary_result")])
try:
content = p.read_text(encoding="utf-8")
return Result(data=summarize.summarise_file(p, content))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.get_file_summary_result", original=e)])
def get_file_slice_result(path: str, start_line: int, end_line: int) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.get_file_slice_result")])
try:
lines = p.read_text(encoding="utf-8").splitlines(keepends=True)
start_idx = start_line - 1
end_idx = end_line
return Result(data="".join(lines[start_idx:end_idx]))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.get_file_slice_result", original=e)])
def set_file_slice_result(path: str, start_line: int, end_line: int, new_content: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.set_file_slice_result")])
try:
lines = p.read_text(encoding="utf-8").splitlines(keepends=True)
start_idx = start_line - 1
end_idx = end_line
if new_content and not new_content.endswith("\n"):
new_content += "\n"
new_lines = new_content.splitlines(keepends=True) if new_content else []
lines[start_idx:end_idx] = new_lines
p.write_text("".join(lines), encoding="utf-8")
return Result(data=f"Successfully updated lines {start_line}-{end_line} in {path}")
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.set_file_slice_result", original=e)])
def get_git_diff_result(path: str, base_rev: str = "HEAD", head_rev: str = "") -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
cmd = ["git", "diff", base_rev]
if head_rev:
cmd.append(head_rev)
cmd.extend(["--", str(p)])
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding="utf-8")
return Result(data=result.stdout if result.stdout else "(no changes)")
except subprocess.CalledProcessError as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=f"git diff failed: {e.stderr}", source="mcp.get_git_diff_result")])
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.get_git_diff_result", original=e)])
def _ast_get_skeleton(code: str, lang: str, path_str: str) -> str:
from src.file_cache import ASTParser
return ASTParser(lang).get_skeleton(code, path=path_str)
def _ast_get_code_outline(code: str, lang: str, path_str: str) -> str:
from src.file_cache import ASTParser
return ASTParser(lang).get_code_outline(code, path=path_str)
def _ast_get_definition(code: str, lang: str, name: str, path_str: str) -> str:
from src.file_cache import ASTParser
return ASTParser(lang).get_definition(code, name, path=path_str)
def _ast_get_signature(code: str, lang: str, name: str, path_str: str) -> str:
from src.file_cache import ASTParser
return ASTParser(lang).get_signature(code, name, path=path_str)
def _ast_update_definition(code: str, lang: str, name: str, new_content: str, path_str: str) -> str:
from src.file_cache import ASTParser
return ASTParser(lang).update_definition(code, name, new_content, path=path_str)
def ts_c_get_skeleton_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_get_skeleton_result")])
try:
code = p.read_text(encoding="utf-8")
return Result(data=_ast_get_skeleton(code, "c", str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_get_skeleton_result", original=e)])
def ts_c_get_code_outline_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_get_code_outline_result")])
try:
code = p.read_text(encoding="utf-8")
return Result(data=_ast_get_code_outline(code, "c", str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_get_code_outline_result", original=e)])
def ts_c_get_definition_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_get_definition_result")])
try:
code = p.read_text(encoding="utf-8")
return Result(data=_ast_get_definition(code, "c", name, str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_get_definition_result", original=e)])
def ts_c_get_signature_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_get_signature_result")])
try:
code = p.read_text(encoding="utf-8")
return Result(data=_ast_get_signature(code, "c", name, str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_get_signature_result", original=e)])
def ts_c_update_definition_result(path: str, name: str, new_content: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_update_definition_result")])
try:
code = p.read_text(encoding="utf-8")
updated_code = _ast_update_definition(code, "c", name, new_content, str(p))
if updated_code.startswith("ERROR:"):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=updated_code, source="mcp.ts_c_update_definition_result")])
p.write_text(updated_code, encoding="utf-8")
return Result(data=f"Successfully updated definition '{name}' in {path}")
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_update_definition_result", original=e)])
def ts_cpp_get_skeleton_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_cpp_get_skeleton_result")])
try:
code = p.read_text(encoding="utf-8")
return Result(data=_ast_get_skeleton(code, "cpp", str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_cpp_get_skeleton_result", original=e)])
def ts_cpp_get_code_outline_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_cpp_get_code_outline_result")])
try:
code = p.read_text(encoding="utf-8")
return Result(data=_ast_get_code_outline(code, "cpp", str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_cpp_get_code_outline_result", original=e)])
def ts_cpp_get_definition_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_cpp_get_definition_result")])
try:
code = p.read_text(encoding="utf-8")
return Result(data=_ast_get_definition(code, "cpp", name, str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_cpp_get_definition_result", original=e)])
def ts_cpp_get_signature_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_cpp_get_signature_result")])
try:
code = p.read_text(encoding="utf-8")
return Result(data=_ast_get_signature(code, "cpp", name, str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_cpp_get_signature_result", original=e)])
def ts_cpp_update_definition_result(path: str, name: str, new_content: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_cpp_update_definition_result")])
try:
code = p.read_text(encoding="utf-8")
updated_code = _ast_update_definition(code, "cpp", name, new_content, str(p))
if updated_code.startswith("ERROR:"):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=updated_code, source="mcp.ts_cpp_update_definition_result")])
p.write_text(updated_code, encoding="utf-8")
return Result(data=f"Successfully updated definition '{name}' in {path}")
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_cpp_update_definition_result", original=e)])
def py_get_skeleton_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.py_get_skeleton_result")])
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("python")
return Result(data=parser.get_skeleton(code, path=str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_get_skeleton_result", original=e)])
def py_get_code_outline_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.py_get_code_outline_result")])
if not p.is_file():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a file: {path}", source="mcp.py_get_code_outline_result")])
try:
code = p.read_text(encoding="utf-8")
return Result(data=outline_tool.get_outline(p, code))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_get_code_outline_result", original=e)])
def py_get_symbol_info_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.py_get_symbol_info_result")])
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("python")
return Result(data=parser.get_symbol_info(code, name, path=str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_get_symbol_info_result", original=e)])
def py_get_definition_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.py_get_definition_result")])
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("python")
return Result(data=parser.get_definition(code, name, path=str(p)))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_get_definition_result", original=e)])
def py_update_definition_result(path: str, name: str, new_content: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.py_update_definition_result")])
try:
from src.file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("python")
updated_code = parser.update_definition(code, name, new_content, path=str(p))
if updated_code.startswith("ERROR:"):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=updated_code, source="mcp.py_update_definition_result")])
p.write_text(updated_code, encoding="utf-8")
return Result(data=f"Successfully updated definition '{name}' in {path}")
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_update_definition_result", original=e)])
def py_get_signature_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.py_get_signature_result")])
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines(keepends=True)
tree = ast.parse(code)
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, (ast.FunctionDef, ast.AsyncFunctionDef)):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find function/method '{name}' in {path}", source="mcp.py_get_signature_result")])
node = node_result.data
start = cast(int, getattr(node, "lineno")) - 1
body_start = cast(int, getattr(node.body[0], "lineno")) - 1
sig = "".join(lines[start:body_start]).rstrip()
if not sig.endswith(":"):
for j in range(body_start, len(lines)):
if ":" in lines[j]:
full_line = lines[j]
colon_idx = full_line.index(":")
sig = "".join(lines[start:j+1])
return Result(data=sig[:colon_idx+1].strip())
return Result(data=sig)
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_get_signature_result", original=e)])
def py_set_signature_result(path: str, name: str, new_signature: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.py_set_signature_result")])
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
tree = ast.parse(code)
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, (ast.FunctionDef, ast.AsyncFunctionDef)):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find function/method '{name}' in {path}", source="mcp.py_set_signature_result")])
node = node_result.data
start = node.lineno
body_start_line = node.body[0].lineno
end = body_start_line - 1
inner = set_file_slice_result(path, start, end, new_signature)
return inner
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_set_signature_result", original=e)])
def py_get_class_summary_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.py_get_class_summary_result")])
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
tree = ast.parse(code)
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, ast.ClassDef):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find class '{name}' in {path}", source="mcp.py_get_class_summary_result")])
node = node_result.data
lines = code.splitlines(keepends=True)
summary = [f"Class: {name}"]
doc = ast.get_docstring(node)
if doc:
summary.append(f" Docstring: {doc}")
for body_node in node.body:
if isinstance(body_node, (ast.FunctionDef, ast.AsyncFunctionDef)):
start = body_node.lineno - 1
body_start = body_node.body[0].lineno - 1
sig = "".join(lines[start:body_start]).strip()
summary.append(f" - {sig}")
return Result(data="\n".join(summary))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_get_class_summary_result", original=e)])
def py_get_var_declaration_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.is_file() or p.suffix != ".py":
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a python file: {path}", source="mcp.py_get_var_declaration_result")])
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines(keepends=True)
tree = ast.parse(code)
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, (ast.Assign, ast.AnnAssign)):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find variable '{name}' in {path}", source="mcp.py_get_var_declaration_result")])
node = node_result.data
start = cast(int, getattr(node, "lineno")) - 1
end = cast(int, getattr(node, "end_lineno"))
return Result(data="".join(lines[start:end]))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_get_var_declaration_result", original=e)])
def py_set_var_declaration_result(path: str, name: str, new_declaration: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.is_file() or p.suffix != ".py":
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a python file: {path}", source="mcp.py_set_var_declaration_result")])
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
tree = ast.parse(code)
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, (ast.Assign, ast.AnnAssign)):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find variable '{name}' in {path}", source="mcp.py_set_var_declaration_result")])
node = node_result.data
start = cast(int, getattr(node, "lineno"))
end = cast(int, getattr(node, "end_lineno"))
inner = set_file_slice_result(path, start, end, new_declaration)
return inner
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_set_var_declaration_result", original=e)])
def py_find_usages_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
try:
import re
pattern = re.compile(r"\b" + re.escape(name) + r"\b")
results: list[str] = []
file_errors: list[ErrorInfo] = []
def _search_file(fp: Path) -> None:
if fp.name == "history.toml" or fp.name.endswith("_history.toml"): return
if not _is_allowed(fp): return
try:
text = fp.read_text(encoding="utf-8")
lines = text.splitlines()
for i, line in enumerate(lines, 1):
if pattern.search(line):
rel = fp.relative_to(_primary_base_dir if _primary_base_dir else Path.cwd())
results.append(f"{rel}:{i}: {line.strip()[:100]}")
except (OSError, UnicodeDecodeError) as e:
errors.append(ErrorInfo(kind=ErrorKind.INTERNAL, message=f"failed to read {fp}: {e}", source="mcp.py_find_usages_result._search_file", original=e))
if p.is_file():
_search_file(p)
else:
for root, dirs, files in os.walk(p):
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')]
for file in files:
if file.endswith(('.py', '.md', '.toml', '.txt', '.json')):
_search_file(Path(root) / file)
if not results:
if file_errors:
return Result(data=f"No usages found for '{name}' in {p}", errors=file_errors)
return Result(data=f"No usages found for '{name}' in {p}")
if len(results) > 100:
return Result(data="\n".join(results[:100]) + f"\n... (and {len(results)-100} more)", errors=file_errors)
return Result(data="\n".join(results), errors=file_errors)
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_find_usages_result", original=e)])
def py_get_imports_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.is_file() or p.suffix != ".py":
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a python file: {path}", source="mcp.py_get_imports_result")])
try:
code = p.read_text(encoding="utf-8")
tree = ast.parse(code)
imports = []
for node in tree.body:
if isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
for alias in node.names:
imports.append(f"{module}.{alias.name}" if module else alias.name)
if not imports:
return Result(data="No imports found.")
return Result(data="Imports:\n" + "\n".join(f" - {i}" for i in imports))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_get_imports_result", original=e)])
def py_check_syntax_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.is_file() or p.suffix != ".py":
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a python file: {path}", source="mcp.py_check_syntax_result")])
try:
code = p.read_text(encoding="utf-8")
ast.parse(code)
return Result(data=f"Syntax OK: {path}")
except SyntaxError as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"SyntaxError in {path} at line {e.lineno}, offset {e.offset}: {e.msg}\n{e.text}", source="mcp.py_check_syntax_result")])
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_check_syntax_result", original=e)])
def py_get_docstring_result(path: str, name: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.is_file() or p.suffix != ".py":
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a python file: {path}", source="mcp.py_get_docstring_result")])
try:
code = p.read_text(encoding="utf-8")
tree = ast.parse(code)
if not name or name == "module":
doc = ast.get_docstring(tree)
return Result(data=doc if doc else "No module docstring found.")
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find symbol '{name}' in {path}", source="mcp.py_get_docstring_result")])
node = node_result.data
if isinstance(node, (ast.AsyncFunctionDef, ast.FunctionDef, ast.ClassDef, ast.Module)):
doc = ast.get_docstring(node)
return Result(data=doc if doc else f"No docstring found for '{name}'.")
return Result(data=f"No docstring found for '{name}'.")
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_get_docstring_result", original=e)])
def derive_code_path_result(target: str, max_depth: int = 5) -> Result[str]:
from src.file_cache import ASTParser
parser = ASTParser("python")
found_path, found_code = None, None
file_errors: list[ErrorInfo] = []
parts = target.split(".")
symbol_name = parts[-1]
if len(parts) > 1:
possible_file = Path(*parts[:-1]).with_suffix(".py")
if possible_file.exists(): found_path = str(possible_file)
if not found_path:
for root in ["src", "simulation"]:
for p in Path(root).rglob("*.py"):
if not _is_allowed(p): continue
code = p.read_text(encoding="utf-8")
if f"def {symbol_name}" in code or f"class {symbol_name}" in code:
try:
tree = ast.parse(code)
if _get_symbol_node_result(tree, symbol_name).ok:
found_path, found_code = str(p), code
break
except (SyntaxError, ValueError) as e:
file_errors.append(ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"failed to parse {p}: {e}", source="mcp.derive_code_path_result", original=e))
continue
if found_path: break
if not found_path:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find definition for '{target}'", source="mcp.derive_code_path_result")])
if not found_code: found_code = Path(found_path).read_text(encoding="utf-8")
visited, output = set(), [f"Code Path for: {target}", "=" * (11 + len(target)), ""]
def trace(name, path, code, depth, indent):
if depth > max_depth or (name, path) in visited: return
visited.add((name, path))
defn = parser.get_definition(code, name, path=path)
if defn.startswith("ERROR:"):
output.append(f"{indent}[!] {name} (Definition not found in {path})")
return
output.append(f"{indent}-> {name} ({path})")
try:
node = ast.parse(defn)
calls = []
for n in ast.walk(node):
if isinstance(n, ast.Call):
if isinstance(n.func, ast.Name): calls.append(n.func.id)
elif isinstance(n.func, ast.Attribute): calls.append(n.func.attr)
for call in sorted(set(calls)):
if call in ("print", "len", "str", "int", "list", "dict", "set", "range", "enumerate", "isinstance", "getattr", "setattr", "hasattr"): continue
c_path, c_code = None, None
full_tree = ast.parse(code)
if _get_symbol_node_result(full_tree, call).ok: c_path, c_code = path, code
else:
for r in ["src", "simulation"]:
for p in Path(r).rglob("*.py"):
if not _is_allowed(p): continue
f_code = p.read_text(encoding="utf-8")
if f"def {call}" in f_code:
c_path, c_code = str(p), f_code
break
if c_path: break
if c_path: trace(call, c_path, c_code, depth + 1, indent + " ")
except (SyntaxError, ValueError, AttributeError) as e:
output.append(f"{indent} [!] Error parsing calls for {name}: {e}")
file_errors.append(ErrorInfo(kind=ErrorKind.INTERNAL, message=f"trace error for {name}: {e}", source="mcp.derive_code_path_result._trace", original=e))
try:
trace(symbol_name, found_path, found_code, 0, "")
return Result(data="\n".join(output), errors=file_errors)
except Exception as e:
file_errors.append(ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.derive_code_path_result", original=e))
return Result(data="", errors=file_errors)
def get_tree_result(path: str, max_depth: int = 2) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.is_dir():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a directory: {path}", source="mcp.get_tree_result")])
m_depth = max_depth
def _build_tree(dir_path: Path, current_depth: int, prefix: str = "") -> list[str]:
if current_depth > m_depth: return []
lines = []
try:
entries = sorted(dir_path.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
except PermissionError:
return []
entries = [e for e in entries if not e.name.startswith('.') and e.name not in ('__pycache__', 'venv', 'env') and e.name != "history.toml" and not e.name.endswith("_history.toml")]
for i, entry in enumerate(entries):
is_last = (i == len(entries) - 1)
connector = "└── " if is_last else "├── "
if entry.is_dir():
lines.append(f"{prefix}{connector}{entry.name}/")
extension = " " if is_last else ""
lines.extend(_build_tree(entry, current_depth + 1, prefix + extension))
else:
lines.append(f"{prefix}{connector}{entry.name}")
return lines
try:
tree_lines = [f"{p.name}/"] + _build_tree(p, 1)
return Result(data="\n".join(tree_lines))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.get_tree_result", original=e)])
def web_search_result(query: str) -> Result[str]:
url = "https://html.duckduckgo.com/html/?q=" + urllib.parse.quote(query)
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
html = resp.read().decode('utf-8', errors='ignore')
parser = _DDGParser()
parser.feed(html)
if not parser.results:
return Result(data=f"No results found for '{query}'")
lines = [f"Search Results for '{query}':"]
for i, r in enumerate(parser.results[:5], 1):
lines.append(f"{i}. {r['title']}\nURL: {r['link']}\nSnippet: {r['snippet']}\n")
return Result(data="\n".join(lines))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.web_search_result", original=e)])
def fetch_url_result(url: str) -> Result[str]:
if url.startswith("//duckduckgo.com/l/?uddg="):
split_uddg = url.split("uddg=")
if len(split_uddg) > 1:
url = urllib.parse.unquote(split_uddg[1].split("&")[0])
if not url.startswith("http"):
url = "https://" + url
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
html = resp.read().decode('utf-8', errors='ignore')
parser = _TextExtractor()
parser.feed(html)
full_text = " ".join(parser.text)
full_text = _re.sub(r'\s+', ' ', full_text)
if not full_text.strip():
return Result(data=f"FETCH OK: No readable text extracted from {url}. The page might be empty, JavaScript-heavy, or blocked.")
if len(full_text) > 40000:
return Result(data=full_text[:40000] + "\n... (content truncated)")
return Result(data=full_text)
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.fetch_url_result", original=e)])
def get_ui_performance_result() -> Result[str]:
if perf_monitor_callback is None:
return Result(data="INFO: UI Performance monitor is not available (headless/CLI mode). This tool is only functional when the Manual Slop GUI is running.")
try:
metrics = perf_monitor_callback()
metric_str = str(metrics)
for char in "{}'":
metric_str = metric_str.replace(char, "")
return Result(data=f"UI Performance Snapshot:\n{metric_str}")
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=f"Failed to retrieve UI performance: {str(e)}", source="mcp.get_ui_performance_result", original=e)])
#endregion: Result Variants
def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str:
"""
Replace exact string match in a file. Preserves indentation and line endings.
Drop-in replacement for native edit tool that destroys 1-space indentation.
Thin wrapper over edit_file_result; the legacy str shape is preserved
for backward compatibility, but the try/except Exception lives in
the Result variant.
"""
resolved = edit_file_result(path, old_string, new_string, replace_all)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def get_file_summary(path: str) -> str:
"""
Return the heuristic summary for a file (same as the initial context block).
For .py files: imports, classes, methods, functions, constants.
For .toml: table keys. For .md: headings. Others: line count + preview.
Thin wrapper over get_file_summary_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = get_file_summary_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def get_file_slice(path: str, start_line: int, end_line: int) -> str:
"""Return a specific line range from a file.
Thin wrapper over get_file_slice_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = get_file_slice_result(path, start_line, end_line)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def set_file_slice(path: str, start_line: int, end_line: int, new_content: str) -> str:
"""Replace a specific line range in a file with new content.
Thin wrapper over set_file_slice_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = set_file_slice_result(path, start_line, end_line, new_content)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str:
"""
Returns the git diff for a file or directory.
base_rev: The base revision (default: HEAD)
head_rev: The head revision (optional)
Thin wrapper over get_git_diff_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = get_git_diff_result(path, base_rev, head_rev)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
#region: C
def ts_c_get_skeleton(path: str) -> str:
"""
Returns a skeleton of a C file.
[C: tests/test_ts_c_tools.py:test_ts_c_get_skeleton]
Thin wrapper over ts_c_get_skeleton_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_c_get_skeleton_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def ts_c_get_code_outline(path: str) -> str:
"""
Returns a hierarchical outline of a C file.
[C: tests/test_ts_c_tools.py:test_ts_c_get_code_outline]
Thin wrapper over ts_c_get_code_outline_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_c_get_code_outline_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def ts_c_get_definition(path: str, name: str) -> str:
"""Returns the source code for a specific definition in a C file.
Thin wrapper over ts_c_get_definition_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_c_get_definition_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def ts_c_get_signature(path: str, name: str) -> str:
"""Returns the signature part of a function in a C file.
Thin wrapper over ts_c_get_signature_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_c_get_signature_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def ts_c_update_definition(path: str, name: str, new_content: str) -> str:
"""Surgically replace the definition of a function in a C file.
Thin wrapper over ts_c_update_definition_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_c_update_definition_result(path, name, new_content)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
#endregion: C
#region: C++
def ts_cpp_get_skeleton(path: str) -> str:
"""
Returns a skeleton of a C++ file.
[C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_skeleton]
Thin wrapper over ts_cpp_get_skeleton_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_cpp_get_skeleton_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def ts_cpp_get_code_outline(path: str) -> str:
"""
Returns a hierarchical outline of a C++ file.
[C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_code_outline]
Thin wrapper over ts_cpp_get_code_outline_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_cpp_get_code_outline_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def ts_cpp_get_definition(path: str, name: str) -> str:
"""Returns the source code for a specific definition in a C++ file.
Thin wrapper over ts_cpp_get_definition_result; the legacy str shape
is preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_cpp_get_definition_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def ts_cpp_get_signature(path: str, name: str) -> str:
"""Returns the signature part of a function or method in a C++ file.
Thin wrapper over ts_cpp_get_signature_result; the legacy str shape
is preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_cpp_get_signature_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def ts_cpp_update_definition(path: str, name: str, new_content: str) -> str:
"""Surgically replace the definition of a class or function in a C++ file.
Thin wrapper over ts_cpp_update_definition_result; the legacy str shape
is preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = ts_cpp_update_definition_result(path, name, new_content)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
#endregion: C++
#region: Python AST
def _get_symbol_node_result(tree: ast.AST, name: str) -> Result[ast.AST]:
"""Result-returning variant of _get_symbol_node."""
parts = name.split(".")
def find_in_scope(scope_node: Any, target_name: str) -> ast.AST | None:
# scope_node could be Module, ClassDef, or FunctionDef
body = getattr(scope_node, "body", [])
for node in body:
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == target_name:
return node
if isinstance(node, ast.Assign):
for t in node.targets:
if isinstance(t, ast.Name) and t.id == target_name:
return node
if isinstance(node, ast.AnnAssign):
if isinstance(node.target, ast.Name) and node.target.id == target_name:
return node
return None
current = tree
for part in parts:
found = find_in_scope(current, part)
if not found:
return Result(data=None, errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"Symbol {part!r} not found in scope", source="mcp_client._get_symbol_node_result")])
current = found
return Result(data=current)
def py_get_skeleton(path: str) -> str:
"""Returns a skeleton of a Python file (preserving docstrings, stripping function bodies).
Thin wrapper over py_get_skeleton_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_skeleton_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_get_code_outline(path: str) -> str:
"""Returns a hierarchical outline of a code file (classes, functions, methods with line ranges).
Thin wrapper over py_get_code_outline_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_code_outline_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_get_symbol_info(path: str, name: str) -> tuple[str, int] | str:
"""Returns (source_code, line_number) for a specific class, function, or method definition.
Thin wrapper over py_get_symbol_info_result; the legacy (str, int) | str
shape is preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_symbol_info_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_get_definition(path: str, name: str) -> str:
"""Returns the source code for a specific class, function, or method definition.
Thin wrapper over py_get_definition_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_definition_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_update_definition(path: str, name: str, new_content: str) -> str:
"""Surgically replace the definition of a class or function.
Thin wrapper over py_update_definition_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_update_definition_result(path, name, new_content)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_get_signature(path: str, name: str) -> str:
"""Returns only the signature part of a function or method (def line until colon).
Thin wrapper over py_get_signature_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_signature_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_set_signature(path: str, name: str, new_signature: str) -> str:
"""Surgically replace only the signature of a function/method.
Thin wrapper over py_set_signature_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_set_signature_result(path, name, new_signature)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_get_class_summary(path: str, name: str) -> str:
"""Returns a summary of a class: its methods and their signatures.
Thin wrapper over py_get_class_summary_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_class_summary_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_get_var_declaration(path: str, name: str) -> str:
"""Get the assignment/declaration line(s) for a module-level or class-level variable.
Thin wrapper over py_get_var_declaration_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_var_declaration_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_set_var_declaration(path: str, name: str, new_declaration: str) -> str:
"""Surgically replace a variable assignment/declaration.
Thin wrapper over py_set_var_declaration_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_set_var_declaration_result(path, name, new_declaration)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_find_usages(path: str, name: str) -> str:
"""Finds exact string matches of a symbol in a given file or directory.
Thin wrapper over py_find_usages_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_find_usages_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_get_imports(path: str) -> str:
"""Parses a file's AST and returns a strict list of its dependencies.
Thin wrapper over py_get_imports_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_imports_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_check_syntax(path: str) -> str:
"""Runs a quick syntax check on a Python file.
Thin wrapper over py_check_syntax_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_check_syntax_result(path)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_get_hierarchy(path: str, class_name: str) -> str:
"""Scans the project to find subclasses of a given class.
Thin wrapper over py_get_hierarchy_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_hierarchy_result(path, class_name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def py_get_docstring(path: str, name: str) -> str:
"""Extracts the docstring for a specific module, class, or function.
Thin wrapper over py_get_docstring_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = py_get_docstring_result(path, name)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def derive_code_path(target: str, max_depth: int = 5) -> str:
"""Recursively traces the execution path of a specific function or method.
Thin wrapper over derive_code_path_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = derive_code_path_result(target, max_depth)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
#endregion Python AST
def get_tree(path: str, max_depth: int = 2) -> str:
"""Returns a directory structure up to a max depth.
Thin wrapper over get_tree_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = get_tree_result(path, max_depth)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
class _DDGParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.results: list[dict[str, str]] = []
self.in_result: bool = False
self.in_title: bool = False
self.in_snippet: bool = False
self.current_link: str = ""
self.current_title: str = ""
self.current_snippet: str = ""
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
attrs_dict = dict(attrs)
if tag == "a" and "result__url" in cast(str, attrs_dict.get("class", "")):
self.current_link = cast(str, attrs_dict.get("href", ""))
if tag == "a" and "result__snippet" in cast(str, attrs_dict.get("class", "")):
self.in_snippet = True
if tag == "h2" and "result__title" in cast(str, attrs_dict.get("class", "")):
self.in_title = True
def handle_endtag(self, tag: str) -> None:
if tag == "a" and self.in_snippet:
self.in_snippet = False
if tag == "h2" and self.in_title:
self.in_title = False
if self.current_link:
self.results.append({
"title": self.current_title.strip(),
"link": self.current_link,
"snippet": self.current_snippet.strip()
})
self.current_title = ""
self.current_snippet = ""
self.current_link = ""
def handle_data(self, data: str) -> None:
if self.in_title:
self.current_title += data
if self.in_snippet:
self.current_snippet += data
class _TextExtractor(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.text: list[str] = []
self.hide: int = 0
self.ignore_tags: set[str] = {'script', 'style', 'head', 'meta', 'nav', 'header', 'footer', 'noscript', 'svg'}
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag in self.ignore_tags:
self.hide += 1
def handle_endtag(self, tag: str) -> None:
if tag in self.ignore_tags:
self.hide -= 1
def handle_data(self, data: str) -> None:
if self.hide == 0:
cleaned = data.strip()
if cleaned:
self.text.append(cleaned)
def web_search(query: str) -> str:
"""Search the web using DuckDuckGo HTML and return top results.
Thin wrapper over web_search_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = web_search_result(query)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def fetch_url(url: str) -> str:
"""Fetch a URL and return its text content (stripped of HTML tags).
Thin wrapper over fetch_url_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = fetch_url_result(url)
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
def get_ui_performance() -> str:
"""
Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag).
[C: tests/test_mcp_perf_tool.py:test_mcp_perf_tool_retrieval]
Thin wrapper over get_ui_performance_result; the legacy str shape is
preserved for backward compatibility, but the try/except Exception
lives in the Result variant.
"""
resolved = get_ui_performance_result()
if resolved.ok:
return resolved.data
return "; ".join(e.ui_message() for e in resolved.errors)
# ------------------------------------------------------------------ tool dispatch
class StdioMCPServer:
def __init__(self, config: MCPServerConfig):
self.config = config
self.name = config.name
self.proc = None
self.tools = {}
self._id_counter = 0
self._pending_requests = {}
self.status = 'idle'
def _get_id(self):
self._id_counter += 1
return self._id_counter
async def start(self):
"""
[C: src/multi_agent_conductor.py:WorkerPool.spawn, src/performance_monitor.py:PerformanceMonitor.__init__, tests/test_ai_client_concurrency.py:test_ai_client_tier_isolation, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread, tests/test_conductor_engine_v2.py:side_effect, tests/test_spawn_interception_v2.py:test_confirm_spawn_pushed_to_queue, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
"""
self.status = 'starting'
self.proc = await asyncio.create_subprocess_exec(
self.config.command,
*self.config.args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
asyncio.create_task(self._read_stderr())
await self.list_tools()
self.status = 'running'
async def stop(self):
"""
[C: tests/test_performance_monitor.py:test_perf_monitor_basic_timing, tests/test_performance_monitor.py:test_perf_monitor_component_timing, tests/test_performance_monitor.py:test_perf_monitor_extended_metrics, tests/test_performance_monitor.py:test_perf_monitor_scope_context_manager, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
Best-effort cleanup. Errors during cleanup are accumulated as ErrorInfo and
surfaced via the [MCP:<name>:stop-warning] drain (consistent with _read_stderr
which also uses print() as a stderr/stdout drain for visibility).
"""
if self.proc:
errors: list[ErrorInfo] = []
if self.proc.stdin:
try:
self.proc.stdin.close()
await self.proc.stdin.wait_closed()
except Exception as e:
errors.append(ErrorInfo(kind=ErrorKind.INTERNAL, message=f"stdin close: {e}", source="mcp.StdioMCPServer.stop", original=e))
try:
self.proc.terminate()
await self.proc.wait()
except Exception as e:
errors.append(ErrorInfo(kind=ErrorKind.INTERNAL, message=f"terminate: {e}", source="mcp.StdioMCPServer.stop", original=e))
for err in errors:
print(f"[MCP:{self.name}:stop-warning] {err.ui_message()}")
self.proc = None
self.status = 'idle'
async def _read_stderr(self):
while self.proc and not self.proc.stdout.at_eof():
line = await self.proc.stderr.readline()
if line:
print(f'[MCP:{self.name}:err] {line.decode().strip()}')
async def _send_request(self, method: str, params: dict = None):
req_id = self._get_id()
request = {
'jsonrpc': '2.0',
'id': req_id,
'method': method,
'params': params or {}
}
self.proc.stdin.write(json.dumps(request).encode() + b'\n')
await self.proc.stdin.drain()
# Simplistic wait for response - in real use, we'd need a read loop
# For now, we'll read one line and hope it's ours (fragile, but for MVP)
line = await self.proc.stdout.readline()
if line:
resp = json.loads(line.decode())
return resp.get('result')
return None
async def list_tools(self):
result = await self._send_request('tools/list')
if result and 'tools' in result:
for t in result['tools']:
self.tools[t['name']] = t
return self.tools
async def call_tool(self, name: str, arguments: dict):
result = await self._send_request('tools/call', {'name': name, 'arguments': arguments})
if result and 'content' in result:
return '\n'.join([c.get('text', '') for c in result['content'] if c.get('type') == 'text'])
return str(result)
class ExternalMCPManager:
"""Manages external MCP servers using the StdioMCPServer class."""
def __init__(self):
"""Initialize the manager with an empty server registry."""
self.servers = {}
async def add_server(self, config: MCPServerConfig):
"""
Add and start a new MCP server from a configuration object.
[C: tests/test_external_mcp.py:test_external_mcp_real_process, tests/test_external_mcp.py:test_get_tool_schemas_includes_external]
"""
if config.url:
# RemoteMCPServer placeholder
return
server = StdioMCPServer(config)
await server.start()
self.servers[config.name] = server
async def stop_all(self):
"""
Stop all managed MCP servers and clear the registry.
[C: tests/test_external_mcp.py:test_external_mcp_real_process, tests/test_external_mcp.py:test_get_tool_schemas_includes_external, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call]
"""
for server in self.servers.values():
await server.stop()
self.servers = {}
def get_all_tools(self) -> dict:
"""
Retrieve a dictionary of all tools available across all managed servers.
[C: tests/test_external_mcp.py:test_external_mcp_real_process, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call]
"""
all_tools = {}
for sname, server in self.servers.items():
for tname, tool in server.tools.items():
all_tools[tname] = {**tool, 'server': sname, 'server_status': server.status}
return all_tools
def get_servers_status(self) -> dict[str, str]:
"""Get the current operational status of all managed servers."""
return {name: server.status for name, server in self.servers.items()}
async def async_dispatch(self, tool_name: str, tool_input: dict) -> str:
"""
Dispatch a tool call to the appropriate external MCP server asynchronously.
[C: src/rag_engine.py:RAGEngine._async_search_mcp, tests/test_external_mcp.py:test_external_mcp_real_process]
"""
for server in self.servers.values():
if tool_name in server.tools:
return await server.call_tool(tool_name, tool_input)
return f'Error: External tool {tool_name} not found.'
_external_mcp_manager = ExternalMCPManager()
def get_external_mcp_manager() -> ExternalMCPManager:
"""
Retrieve the global ExternalMCPManager instance.
[C: tests/test_external_mcp.py:test_get_tool_schemas_includes_external, tests/test_external_mcp_e2e.py:test_external_mcp_e2e_refresh_and_call]
"""
global _external_mcp_manager
return _external_mcp_manager
def dispatch(tool_name: str, tool_input: dict[str, Any]) -> str:
"""
Dispatch an MCP tool call by name. Returns the result as a string.
[C: tests/test_gemini_cli_edge_cases.py:test_gemini_cli_parameter_resilience, tests/test_mcp_client_beads.py:test_bd_mcp_tools, tests/test_mcp_ts_integration.py:test_ts_c_get_code_outline_dispatch, tests/test_mcp_ts_integration.py:test_ts_c_get_definition_dispatch, tests/test_mcp_ts_integration.py:test_ts_c_get_signature_dispatch, tests/test_mcp_ts_integration.py:test_ts_c_get_skeleton_dispatch, tests/test_mcp_ts_integration.py:test_ts_c_update_definition_dispatch, tests/test_mcp_ts_integration.py:test_ts_cpp_get_code_outline_dispatch, tests/test_mcp_ts_integration.py:test_ts_cpp_get_definition_dispatch, tests/test_mcp_ts_integration.py:test_ts_cpp_get_signature_dispatch, tests/test_mcp_ts_integration.py:test_ts_cpp_get_skeleton_dispatch, tests/test_mcp_ts_integration.py:test_ts_cpp_update_definition_dispatch, tests/test_py_struct_tools.py:test_mcp_dispatch_errors, tests/test_py_struct_tools.py:test_mcp_dispatch_integration]
"""
# py_struct_tools is loaded on-demand to keep the main-thread import graph lean.
from scripts import py_struct_tools
# Handle aliases
path = str(tool_input.get("path", tool_input.get("file_path", tool_input.get("dir_path", ""))))
if tool_name == "read_file":
return read_file(path)
if tool_name == "list_directory":
return list_directory(path)
if tool_name == "search_files":
return search_files(path, str(tool_input.get("pattern", "*")))
if tool_name == "get_file_summary":
return get_file_summary(path)
if tool_name == "py_get_skeleton":
return py_get_skeleton(path)
if tool_name == "ts_c_get_skeleton":
return ts_c_get_skeleton(path)
if tool_name == "ts_cpp_get_skeleton":
return ts_cpp_get_skeleton(path)
if tool_name == "py_get_code_outline":
return py_get_code_outline(path)
if tool_name == "ts_c_get_code_outline":
return ts_c_get_code_outline(path)
if tool_name == "ts_cpp_get_code_outline":
return ts_cpp_get_code_outline(path)
if tool_name == "ts_c_get_definition":
return ts_c_get_definition(path, str(tool_input.get("name", "")))
if tool_name == "ts_cpp_get_definition":
return ts_cpp_get_definition(path, str(tool_input.get("name", "")))
if tool_name == "ts_c_get_signature":
return ts_c_get_signature(path, str(tool_input.get("name", "")))
if tool_name == "ts_cpp_get_signature":
return ts_cpp_get_signature(path, str(tool_input.get("name", "")))
if tool_name == "ts_c_update_definition":
return ts_c_update_definition(path, str(tool_input.get("name", "")), str(tool_input.get("new_content", "")))
if tool_name == "ts_cpp_update_definition":
return ts_cpp_update_definition(path, str(tool_input.get("name", "")), str(tool_input.get("new_content", "")))
if tool_name == "py_remove_def":
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return "; ".join(e.ui_message() for e in resolved.errors)
if isinstance(resolved.data, NilPath):
return "; ".join(e.ui_message() for e in resolved.errors)
p = resolved.data
return py_struct_tools.py_remove_def(str(p), str(tool_input.get("name", "")))
if tool_name == "py_add_def":
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return "; ".join(e.ui_message() for e in resolved.errors)
if isinstance(resolved.data, NilPath):
return "; ".join(e.ui_message() for e in resolved.errors)
p = resolved.data
return py_struct_tools.py_add_def(
str(p),
str(tool_input.get("name", "")),
str(tool_input.get("new_content", "")),
str(tool_input.get("anchor_type", "")),
tool_input.get("anchor_symbol")
)
if tool_name == "py_move_def":
resolved_src = _resolve_and_check_result(str(tool_input.get("src_path", "")))
if not resolved_src.ok or isinstance(resolved_src.data, NilPath):
return "; ".join(e.ui_message() for e in resolved_src.errors)
resolved_dest = _resolve_and_check_result(str(tool_input.get("dest_path", "")))
if not resolved_dest.ok or isinstance(resolved_dest.data, NilPath):
return "; ".join(e.ui_message() for e in resolved_dest.errors)
return py_struct_tools.py_move_def(
str(resolved_src.data),
str(resolved_dest.data),
str(tool_input.get("name", "")),
str(tool_input.get("dest_name", "")),
str(tool_input.get("anchor_type", "")),
tool_input.get("anchor_symbol")
)
if tool_name == "py_region_wrap":
resolved = _resolve_and_check_result(path)
if not resolved.ok or isinstance(resolved.data, NilPath):
return "; ".join(e.ui_message() for e in resolved.errors)
return py_struct_tools.py_region_wrap(
str(resolved.data),
int(tool_input.get("start_line", 1)),
int(tool_input.get("end_line", 1)),
str(tool_input.get("region_name", ""))
)
if tool_name == "py_get_definition":
return py_get_definition(path, str(tool_input.get("name", "")))
if tool_name == "py_update_definition":
return py_update_definition(path, str(tool_input.get("name", "")), str(tool_input.get("new_content", "")))
if tool_name == "py_get_signature":
return py_get_signature(path, str(tool_input.get("name", "")))
if tool_name == "py_set_signature":
return py_set_signature(path, str(tool_input.get("name", "")), str(tool_input.get("new_signature", "")))
if tool_name == "py_get_class_summary":
return py_get_class_summary(path, str(tool_input.get("name", "")))
if tool_name == "py_get_var_declaration":
return py_get_var_declaration(path, str(tool_input.get("name", "")))
if tool_name == "py_set_var_declaration":
return py_set_var_declaration(path, str(tool_input.get("name", "")), str(tool_input.get("new_declaration", "")))
if tool_name == "get_file_slice":
return get_file_slice(path, int(tool_input.get("start_line", 1)), int(tool_input.get("end_line", 1)))
if tool_name == "set_file_slice":
return set_file_slice(path, int(tool_input.get("start_line", 1)), int(tool_input.get("end_line", 1)), str(tool_input.get("new_content", "")))
if tool_name == "get_git_diff":
return get_git_diff(
path,
str(tool_input.get("base_rev", "HEAD")),
str(tool_input.get("head_rev", ""))
)
if tool_name == "edit_file":
return edit_file(
path,
str(tool_input.get("old_string", "")),
str(tool_input.get("new_string", "")),
bool(tool_input.get("replace_all", False))
)
if tool_name == "web_search":
return web_search(str(tool_input.get("query", "")))
if tool_name == "fetch_url":
return fetch_url(str(tool_input.get("url", "")))
if tool_name == "get_ui_performance":
return get_ui_performance()
if tool_name == "py_find_usages":
return py_find_usages(path, str(tool_input.get("name", "")))
if tool_name == "py_get_imports":
return py_get_imports(path)
if tool_name == "py_check_syntax":
return py_check_syntax(path)
if tool_name == "py_get_hierarchy":
return py_get_hierarchy(path, str(tool_input.get("class_name", "")))
if tool_name == "py_get_docstring":
return py_get_docstring(path, str(tool_input.get("name", "")))
if tool_name == "get_tree":
return get_tree(path, int(tool_input.get("max_depth", 2)))
if tool_name == "derive_code_path":
return derive_code_path(str(tool_input.get("target", "")), int(tool_input.get("max_depth", 5)))
if tool_name == "derive_code_path":
return derive_code_path(str(tool_input.get("target", "")), int(tool_input.get("max_depth", 5)))
# Beads tools
if tool_name.startswith("bd_"):
if not _primary_base_dir:
return "ERROR: no active workspace to run beads tools."
bclient = beads_client.BeadsClient(_primary_base_dir)
if tool_name == "bd_list":
beads = bclient.list_beads()
if not beads:
return "No beads found."
return "\n".join([f"ID: {b.id}, Status: {b.status}, Title: {b.title}" for b in beads])
elif tool_name == "bd_create":
title = str(tool_input.get("title", ""))
desc = str(tool_input.get("description", ""))
bid = bclient.create_bead(title, desc)
return f"Created bead: {bid}"
elif tool_name == "bd_update":
bid = str(tool_input.get("bead_id", ""))
status = str(tool_input.get("status", ""))
if bclient.update_bead(bid, status):
return f"Updated {bid} to status {status}"
return f"ERROR: bead {bid} not found."
elif tool_name == "bd_ready":
return "READY" if bclient.is_initialized() else "NOT_INITIALIZED"
return f"ERROR: unknown MCP tool '{tool_name}'"
async def async_dispatch(tool_name: str, tool_input: dict[str, Any]) -> str:
# Check native tools
"""
[C: src/rag_engine.py:RAGEngine._async_search_mcp, tests/test_external_mcp.py:test_external_mcp_real_process]
"""
native_names = mcp_tool_specs.tool_names()
if tool_name in native_names:
return await asyncio.to_thread(dispatch, tool_name, tool_input)
# Check external tools
if tool_name in get_external_mcp_manager().get_all_tools():
return await get_external_mcp_manager().async_dispatch(tool_name, tool_input)
return f'ERROR: unknown MCP tool {tool_name}'
def get_tool_schemas() -> list[dict[str, Any]]:
"""
[C: tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_mcp_client_dispatch_completeness, tests/test_external_mcp.py:test_get_tool_schemas_includes_external, tests/test_mcp_client_beads.py:test_bd_mcp_tools]
"""
res = [t.to_dict() for t in mcp_tool_specs.get_tool_schemas()]
manager = get_external_mcp_manager()
for tname, tinfo in manager.get_all_tools().items():
from src.type_aliases import ToolDefinition as _TD
td = _TD.from_dict(tinfo) if isinstance(tinfo, dict) else tinfo
res.append({
'name': tname,
'description': td.description,
'parameters': tinfo.get('inputSchema', {'type': 'object', 'properties': {}})
})
return res
# ------------------------------------------------------------------ tool schema helpers
# These are imported by ai_client.py to build provider-specific declarations.
# Tool schemas live in src/mcp_tool_specs.py (the typed ToolSpec registry).
# Backward-compat: TOOL_NAMES re-exports the set for callers that still import it.
# New code should use `from src import mcp_tool_specs; mcp_tool_specs.tool_names()` directly.
TOOL_NAMES: set[str] = mcp_tool_specs.tool_names()