finish first pass on mcp client org
This commit is contained in:
+85
-95
@@ -134,16 +134,14 @@ def configure(file_items: list[dict[str, Any]], extra_base_dirs: list[str] | Non
|
||||
|
||||
def _is_allowed(path: Path) -> bool:
|
||||
"""
|
||||
|
||||
Return True if `path` is within the allowlist.
|
||||
A path is allowed if:
|
||||
- it is explicitly in _allowed_paths, OR
|
||||
- it is contained within (or equal to) one of the _base_dirs
|
||||
All paths are resolved (follows symlinks) before comparison to prevent
|
||||
symlink-based path traversal.
|
||||
|
||||
Return True if `path` is within the allowlist.
|
||||
A path is allowed if:
|
||||
- it is explicitly in _allowed_paths, OR
|
||||
- it is contained within (or equal to) one of the _base_dirs
|
||||
All paths are resolved (follows symlinks) before comparison to prevent
|
||||
symlink-based path traversal.
|
||||
|
||||
CRITICAL: Blacklisted files (history) are NEVER allowed.
|
||||
CRITICAL: Blacklisted files (history) are NEVER allowed.
|
||||
[C: tests/test_arch_boundary_phase1.py:TestArchBoundaryPhase1.test_mcp_client_whitelist_enforcement, tests/test_history_management.py:test_mcp_blacklist]
|
||||
"""
|
||||
from src.paths import get_config_path, get_credentials_path
|
||||
@@ -181,10 +179,8 @@ def _is_allowed(path: Path) -> bool:
|
||||
|
||||
def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
|
||||
"""
|
||||
|
||||
|
||||
Resolve raw_path and verify it passes the allowlist check.
|
||||
Returns (resolved_path, error_string). error_string is empty on success.
|
||||
Resolve raw_path and verify it passes the allowlist check.
|
||||
Returns (resolved_path, error_string). error_string is empty on success.
|
||||
"""
|
||||
try:
|
||||
p = Path(raw_path)
|
||||
@@ -202,43 +198,6 @@ def _resolve_and_check(raw_path: str) -> tuple[Path | None, str]:
|
||||
return p, ""
|
||||
# ------------------------------------------------------------------ tool implementations
|
||||
|
||||
def read_file(path: str) -> str:
|
||||
"""Return the UTF-8 content of a file, or an error string."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err or p is None:
|
||||
return err
|
||||
if not p.exists(): return f"ERROR: file not found: {path}"
|
||||
if not p.is_file(): return f"ERROR: not a file: {path}"
|
||||
try:
|
||||
return p.read_text(encoding="utf-8")
|
||||
except Exception as e:
|
||||
return f"ERROR reading '{path}': {e}"
|
||||
|
||||
def list_directory(path: str) -> str:
|
||||
"""List entries in a directory. Returns a compact text table."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err or p is None:
|
||||
return err
|
||||
if not p.exists(): return f"ERROR: path not found: {path}"
|
||||
if not p.is_dir(): return f"ERROR: not a directory: {path}"
|
||||
try:
|
||||
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
|
||||
lines = [f"Directory: {p}", ""]
|
||||
count = 0
|
||||
for entry in entries:
|
||||
# Blacklist check
|
||||
name = entry.name.lower()
|
||||
if name == "history.toml" or name.endswith("_history.toml"):
|
||||
continue
|
||||
kind = "file" if entry.is_file() else "dir "
|
||||
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
|
||||
lines.append(f" [{kind}] {entry.name:<40} {size}")
|
||||
count += 1
|
||||
lines.append(f" ({count} entries)")
|
||||
return "\n".join(lines)
|
||||
except Exception as e:
|
||||
return f"ERROR listing '{path}': {e}"
|
||||
|
||||
def search_files(path: str, pattern: str) -> str:
|
||||
"""
|
||||
Search for files matching a glob pattern within path.
|
||||
@@ -269,6 +228,74 @@ def search_files(path: str, pattern: str) -> str:
|
||||
except Exception as e:
|
||||
return f"ERROR searching '{path}': {e}"
|
||||
|
||||
def list_directory(path: str) -> str:
|
||||
"""List entries in a directory. Returns a compact text table."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err or p is None:
|
||||
return err
|
||||
if not p.exists(): return f"ERROR: path not found: {path}"
|
||||
if not p.is_dir(): return f"ERROR: not a directory: {path}"
|
||||
try:
|
||||
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
|
||||
lines = [f"Directory: {p}", ""]
|
||||
count = 0
|
||||
for entry in entries:
|
||||
# Blacklist check
|
||||
name = entry.name.lower()
|
||||
if name == "history.toml" or name.endswith("_history.toml"):
|
||||
continue
|
||||
kind = "file" if entry.is_file() else "dir "
|
||||
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
|
||||
lines.append(f" [{kind}] {entry.name:<40} {size}")
|
||||
count += 1
|
||||
lines.append(f" ({count} entries)")
|
||||
return "\n".join(lines)
|
||||
except Exception as e:
|
||||
return f"ERROR listing '{path}': {e}"
|
||||
|
||||
def read_file(path: str) -> str:
|
||||
"""Return the UTF-8 content of a file, or an error string."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err or p is None:
|
||||
return err
|
||||
if not p.exists(): return f"ERROR: file not found: {path}"
|
||||
if not p.is_file(): return f"ERROR: not a file: {path}"
|
||||
try:
|
||||
return p.read_text(encoding="utf-8")
|
||||
except Exception as e:
|
||||
return f"ERROR reading '{path}': {e}"
|
||||
|
||||
def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str:
|
||||
"""
|
||||
Replace exact string match in a file. Preserves indentation and line endings.
|
||||
Drop-in replacement for native edit tool that destroys 1-space indentation.
|
||||
"""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
return err
|
||||
assert p is not None
|
||||
if not p.exists():
|
||||
return f"ERROR: file not found: {path}"
|
||||
if not old_string:
|
||||
return "ERROR: old_string cannot be empty"
|
||||
try:
|
||||
content = p.read_text(encoding="utf-8")
|
||||
if old_string not in content:
|
||||
return f"ERROR: old_string not found in '{path}'"
|
||||
count = content.count(old_string)
|
||||
if count > 1 and not replace_all:
|
||||
return f"ERROR: Found {count} matches for old_string in '{path}'. Use replace_all=true or provide more context to make it unique."
|
||||
if replace_all:
|
||||
new_content = content.replace(old_string, new_string)
|
||||
p.write_text(new_content, encoding="utf-8")
|
||||
return f"Successfully replaced {count} occurrences in '{path}'"
|
||||
else:
|
||||
new_content = content.replace(old_string, new_string, 1)
|
||||
p.write_text(new_content, encoding="utf-8")
|
||||
return f"Successfully replaced 1 occurrence in '{path}'"
|
||||
except Exception as e:
|
||||
return f"ERROR editing '{path}': {e}"
|
||||
|
||||
def get_file_summary(path: str) -> str:
|
||||
"""
|
||||
Return the heuristic summary for a file (same as the initial context block).
|
||||
@@ -325,37 +352,6 @@ def set_file_slice(path: str, start_line: int, end_line: int, new_content: str)
|
||||
except Exception as e:
|
||||
return f"ERROR updating slice in '{path}': {e}"
|
||||
|
||||
def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str:
|
||||
"""
|
||||
Replace exact string match in a file. Preserves indentation and line endings.
|
||||
Drop-in replacement for native edit tool that destroys 1-space indentation.
|
||||
"""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
return err
|
||||
assert p is not None
|
||||
if not p.exists():
|
||||
return f"ERROR: file not found: {path}"
|
||||
if not old_string:
|
||||
return "ERROR: old_string cannot be empty"
|
||||
try:
|
||||
content = p.read_text(encoding="utf-8")
|
||||
if old_string not in content:
|
||||
return f"ERROR: old_string not found in '{path}'"
|
||||
count = content.count(old_string)
|
||||
if count > 1 and not replace_all:
|
||||
return f"ERROR: Found {count} matches for old_string in '{path}'. Use replace_all=true or provide more context to make it unique."
|
||||
if replace_all:
|
||||
new_content = content.replace(old_string, new_string)
|
||||
p.write_text(new_content, encoding="utf-8")
|
||||
return f"Successfully replaced {count} occurrences in '{path}'"
|
||||
else:
|
||||
new_content = content.replace(old_string, new_string, 1)
|
||||
p.write_text(new_content, encoding="utf-8")
|
||||
return f"Successfully replaced 1 occurrence in '{path}'"
|
||||
except Exception as e:
|
||||
return f"ERROR editing '{path}': {e}"
|
||||
|
||||
def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str:
|
||||
"""
|
||||
Returns the git diff for a file or directory.
|
||||
@@ -601,13 +597,10 @@ def py_get_code_outline(path: str) -> str:
|
||||
except Exception as e:
|
||||
return f"ERROR generating outline for '{path}': {e}"
|
||||
|
||||
|
||||
def py_get_symbol_info(path: str, name: str) -> tuple[str, int] | str:
|
||||
"""
|
||||
|
||||
|
||||
Returns (source_code, line_number) for a specific class, function, or method definition.
|
||||
If not found, returns an error string.
|
||||
Returns (source_code, line_number) for a specific class, function, or method definition.
|
||||
If not found, returns an error string.
|
||||
"""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
@@ -632,11 +625,9 @@ def py_get_symbol_info(path: str, name: str) -> tuple[str, int] | str:
|
||||
|
||||
def py_get_definition(path: str, name: str) -> str:
|
||||
"""
|
||||
|
||||
|
||||
Returns the source code for a specific class, function, or method definition.
|
||||
path: Path to the code file.
|
||||
name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method').
|
||||
Returns the source code for a specific class, function, or method definition.
|
||||
path: Path to the code file.
|
||||
name: Name of the definition to retrieve (e.g., 'MyClass', 'my_function', 'MyClass.my_method').
|
||||
"""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
@@ -1146,9 +1137,8 @@ def fetch_url(url: str) -> str:
|
||||
|
||||
def get_ui_performance() -> str:
|
||||
"""
|
||||
|
||||
Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag).
|
||||
[C: tests/test_mcp_perf_tool.py:test_mcp_perf_tool_retrieval]
|
||||
Returns current UI performance metrics (FPS, Frame Time, CPU, Input Lag).
|
||||
[C: tests/test_mcp_perf_tool.py:test_mcp_perf_tool_retrieval]
|
||||
"""
|
||||
if perf_monitor_callback is None:
|
||||
return "INFO: UI Performance monitor is not available (headless/CLI mode). This tool is only functional when the Manual Slop GUI is running."
|
||||
@@ -2278,4 +2268,4 @@ MCP_TOOL_SPECS: list[dict[str, Any]] = [
|
||||
}
|
||||
]
|
||||
|
||||
TOOL_NAMES: set[str] = {t['name'] for t in MCP_TOOL_SPECS}
|
||||
TOOL_NAMES: set[str] = {t['name'] for t in MCP_TOOL_SPECS}
|
||||
|
||||
Reference in New Issue
Block a user