From cf5e7b9925ec3aa1be6e21ca5c71b5f22d9b6da3 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Fri, 12 Jun 2026 17:44:55 -0400 Subject: [PATCH] feat(mcp): add Result-returning variants of resolve, read, list, search Strictly additive: existing _resolve_and_check, read_file, list_directory, and search_files are unchanged. The new variants return Result[Path] or Result[str] using the data-oriented ErrorInfo/ErrorKind convention. --- src/mcp_client.py | 99 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/src/mcp_client.py b/src/mcp_client.py index b1f45e83..fd4374b7 100644 --- a/src/mcp_client.py +++ b/src/mcp_client.py @@ -72,6 +72,7 @@ from src import beads_client from src import models from src import outline_tool from src import summarize +from src.result_types import ErrorInfo, ErrorKind, NilPath, Result # ------------------------------------------------------------------ mutating tools sentinel @@ -265,6 +266,104 @@ def read_file(path: str) -> str: except Exception as e: return f"ERROR reading '{path}': {e}" +#region: Result Variants +def _resolve_and_check_result(raw_path: str) -> Result[Path]: + try: + p = Path(raw_path) + if not p.is_absolute() and _primary_base_dir: + p = _primary_base_dir / p + p = p.resolve() + except Exception as e: + return Result( + data=NilPath(), + errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=str(e), source="mcp._resolve_and_check_result", original=e)], + ) + if not _is_allowed(p): + allowed_bases = "\n".join([f" - {d}" for d in _base_dirs]) + return Result( + data=NilPath(), + errors=[ErrorInfo( + kind=ErrorKind.PERMISSION, + message=f"path '{raw_path}' resolves to '{p}', which is not within the allowed paths.\nAllowed base directories are:\n{allowed_bases}", + source="mcp._resolve_and_check_result", + )], + ) + return Result(data=p) + +def read_file_result(path: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.exists(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.read_file_result")]) + if not p.is_file(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a file: {path}", source="mcp.read_file_result")]) + try: + content = p.read_text(encoding="utf-8") + return Result(data=content) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.read_file_result", original=e)]) + +def list_directory_result(path: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.exists(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"path not found: {path}", source="mcp.list_directory_result")]) + if not p.is_dir(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a directory: {path}", source="mcp.list_directory_result")]) + try: + entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower())) + lines = [f"Directory: {p}", ""] + count = 0 + for entry in entries: + name = entry.name.lower() + if name == "history.toml" or name.endswith("_history.toml"): + continue + kind = "file" if entry.is_file() else "dir " + size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else "" + lines.append(f" [{kind}] {entry.name:<40} {size}") + count += 1 + lines.append(f" ({count} entries)") + return Result(data="\n".join(lines)) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.list_directory_result", original=e)]) + +def search_files_result(path: str, pattern: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.is_dir(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a directory: {path}", source="mcp.search_files_result")]) + try: + matches = sorted(p.glob(pattern)) + if not matches: + return Result(data=f"No files matched '{pattern}' in {path}") + lines = [f"Search '{pattern}' in {p}:", ""] + count = 0 + for m in matches: + name = m.name.lower() + if name == "history.toml" or name.endswith("_history.toml"): + continue + rel = m.relative_to(p) + kind = "file" if m.is_file() else "dir " + lines.append(f" [{kind}] {rel}") + count += 1 + lines.append(f" ({count} match(es))") + return Result(data="\n".join(lines)) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.search_files_result", original=e)]) +#endregion: Result Variants + def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str: """ Replace exact string match in a file. Preserves indentation and line endings.