Private
Public Access
0
0

feat(mcp): add Result-returning variants of resolve, read, list, search

Strictly additive: existing _resolve_and_check, read_file, list_directory,
and search_files are unchanged. The new variants return Result[Path] or
Result[str] using the data-oriented ErrorInfo/ErrorKind convention.
This commit is contained in:
2026-06-12 17:44:55 -04:00
parent de0b49828d
commit cf5e7b9925
+99
View File
@@ -72,6 +72,7 @@ from src import beads_client
from src import models
from src import outline_tool
from src import summarize
from src.result_types import ErrorInfo, ErrorKind, NilPath, Result
# ------------------------------------------------------------------ mutating tools sentinel
@@ -265,6 +266,104 @@ def read_file(path: str) -> str:
except Exception as e:
return f"ERROR reading '{path}': {e}"
#region: Result Variants
def _resolve_and_check_result(raw_path: str) -> Result[Path]:
try:
p = Path(raw_path)
if not p.is_absolute() and _primary_base_dir:
p = _primary_base_dir / p
p = p.resolve()
except Exception as e:
return Result(
data=NilPath(),
errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=str(e), source="mcp._resolve_and_check_result", original=e)],
)
if not _is_allowed(p):
allowed_bases = "\n".join([f" - {d}" for d in _base_dirs])
return Result(
data=NilPath(),
errors=[ErrorInfo(
kind=ErrorKind.PERMISSION,
message=f"path '{raw_path}' resolves to '{p}', which is not within the allowed paths.\nAllowed base directories are:\n{allowed_bases}",
source="mcp._resolve_and_check_result",
)],
)
return Result(data=p)
def read_file_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.read_file_result")])
if not p.is_file():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a file: {path}", source="mcp.read_file_result")])
try:
content = p.read_text(encoding="utf-8")
return Result(data=content)
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.read_file_result", original=e)])
def list_directory_result(path: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.exists():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"path not found: {path}", source="mcp.list_directory_result")])
if not p.is_dir():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a directory: {path}", source="mcp.list_directory_result")])
try:
entries = sorted(p.iterdir(), key=lambda e: (e.is_file(), e.name.lower()))
lines = [f"Directory: {p}", ""]
count = 0
for entry in entries:
name = entry.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
kind = "file" if entry.is_file() else "dir "
size = f"{entry.stat().st_size:>10,} bytes" if entry.is_file() else ""
lines.append(f" [{kind}] {entry.name:<40} {size}")
count += 1
lines.append(f" ({count} entries)")
return Result(data="\n".join(lines))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.list_directory_result", original=e)])
def search_files_result(path: str, pattern: str) -> Result[str]:
resolved = _resolve_and_check_result(path)
if not resolved.ok:
return Result(data="", errors=resolved.errors)
p = resolved.data
if isinstance(p, NilPath):
return Result(data="", errors=resolved.errors)
if not p.is_dir():
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"not a directory: {path}", source="mcp.search_files_result")])
try:
matches = sorted(p.glob(pattern))
if not matches:
return Result(data=f"No files matched '{pattern}' in {path}")
lines = [f"Search '{pattern}' in {p}:", ""]
count = 0
for m in matches:
name = m.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
rel = m.relative_to(p)
kind = "file" if m.is_file() else "dir "
lines.append(f" [{kind}] {rel}")
count += 1
lines.append(f" ({count} match(es))")
return Result(data="\n".join(lines))
except Exception as e:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.search_files_result", original=e)])
#endregion: Result Variants
def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str:
"""
Replace exact string match in a file. Preserves indentation and line endings.