TIER-2 READ conductor/code_styleguides/error_handling.md end-to-end before Phase 6: refactor(mcp_client): migrate 8 Batch D sites to Result[T]
Phase 6 Batch D (8 INTERNAL_BROAD_CATCH sites in mcp_client.py): Legacy functions now delegate to _result variants: - py_get_signature_result + py_get_signature - py_set_signature_result + py_set_signature - py_get_class_summary_result + py_get_class_summary - py_get_var_declaration_result + py_get_var_declaration - py_set_var_declaration_result + py_set_var_declaration - py_find_usages_result + py_find_usages - py_get_imports_result + py_get_imports - py_check_syntax_result + py_check_syntax Audit: mcp_client BC 16 -> 9 (8 sites migrated, -1 from _search_file nested try/except now flagged as audit target; will be cleaned up in Phase 8). Total: 48 sites migrated across Phases 3-6 (Phases 3+4+5+6 = 32 BC sites in mcp_client).
This commit is contained in:
+79
-184
@@ -1206,205 +1206,100 @@ def py_update_definition(path: str, name: str, new_content: str) -> str:
|
||||
return "; ".join(e.ui_message() for e in resolved.errors)
|
||||
|
||||
def py_get_signature(path: str, name: str) -> str:
|
||||
"""Returns only the signature part of a function or method (def line until colon)."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
return err
|
||||
assert p is not None
|
||||
if not p.exists():
|
||||
return f"ERROR: file not found: {path}"
|
||||
try:
|
||||
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
|
||||
lines = code.splitlines(keepends=True)
|
||||
tree = ast.parse(code)
|
||||
node = _get_symbol_node(tree, name)
|
||||
if not node or not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
return f"ERROR: could not find function/method '{name}' in {path}"
|
||||
start = node.lineno - 1
|
||||
body_start = node.body[0].lineno - 1
|
||||
sig_lines = lines[start:body_start]
|
||||
sig = "".join(sig_lines).strip()
|
||||
if sig.endswith(":"):
|
||||
return sig
|
||||
# If body is on the same line (e.g. def foo(): pass), we need to split by colon
|
||||
full_line = lines[start]
|
||||
colon_idx = full_line.find(":")
|
||||
if colon_idx != -1:
|
||||
return full_line[:colon_idx+1].strip()
|
||||
return sig
|
||||
except Exception as e:
|
||||
return f"ERROR retrieving signature '{name}' from '{path}': {e}"
|
||||
"""Returns only the signature part of a function or method (def line until colon).
|
||||
|
||||
Thin wrapper over py_get_signature_result; the legacy str shape is
|
||||
preserved for backward compatibility, but the try/except Exception
|
||||
lives in the Result variant.
|
||||
"""
|
||||
resolved = py_get_signature_result(path, name)
|
||||
if resolved.ok:
|
||||
return resolved.data
|
||||
return "; ".join(e.ui_message() for e in resolved.errors)
|
||||
|
||||
def py_set_signature(path: str, name: str, new_signature: str) -> str:
|
||||
"""Surgically replace only the signature of a function/method."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
return err
|
||||
assert p is not None
|
||||
if not p.exists():
|
||||
return f"ERROR: file not found: {path}"
|
||||
try:
|
||||
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
|
||||
code.splitlines(keepends=True)
|
||||
tree = ast.parse(code)
|
||||
node = _get_symbol_node(tree, name)
|
||||
if not node or not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
return f"ERROR: could not find function/method '{name}' in {path}"
|
||||
start = node.lineno
|
||||
body_start_line = node.body[0].lineno
|
||||
# We replace from start until body_start_line - 1
|
||||
# But we must be careful about comments/docstrings between sig and body
|
||||
# For now, we replace only the lines that contain the signature
|
||||
end = body_start_line - 1
|
||||
return set_file_slice(path, start, end, new_signature)
|
||||
except Exception as e:
|
||||
return f"ERROR updating signature '{name}' in '{path}': {e}"
|
||||
"""Surgically replace only the signature of a function/method.
|
||||
|
||||
Thin wrapper over py_set_signature_result; the legacy str shape is
|
||||
preserved for backward compatibility, but the try/except Exception
|
||||
lives in the Result variant.
|
||||
"""
|
||||
resolved = py_set_signature_result(path, name, new_signature)
|
||||
if resolved.ok:
|
||||
return resolved.data
|
||||
return "; ".join(e.ui_message() for e in resolved.errors)
|
||||
|
||||
def py_get_class_summary(path: str, name: str) -> str:
|
||||
"""Returns a summary of a class: its methods and their signatures."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
return err
|
||||
assert p is not None
|
||||
if not p.exists():
|
||||
return f"ERROR: file not found: {path}"
|
||||
try:
|
||||
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
|
||||
tree = ast.parse(code)
|
||||
node = _get_symbol_node(tree, name)
|
||||
if not node or not isinstance(node, ast.ClassDef):
|
||||
return f"ERROR: could not find class '{name}' in {path}"
|
||||
lines = code.splitlines(keepends=True)
|
||||
summary = [f"Class: {name}"]
|
||||
doc = ast.get_docstring(node)
|
||||
if doc:
|
||||
summary.append(f" Docstring: {doc}")
|
||||
for body_node in node.body:
|
||||
if isinstance(body_node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
start = body_node.lineno - 1
|
||||
body_start = body_node.body[0].lineno - 1
|
||||
sig = "".join(lines[start:body_start]).strip()
|
||||
summary.append(f" - {sig}")
|
||||
return "\n".join(summary)
|
||||
except Exception as e:
|
||||
return f"ERROR summarizing class '{name}' in '{path}': {e}"
|
||||
"""Returns a summary of a class: its methods and their signatures.
|
||||
|
||||
Thin wrapper over py_get_class_summary_result; the legacy str shape is
|
||||
preserved for backward compatibility, but the try/except Exception
|
||||
lives in the Result variant.
|
||||
"""
|
||||
resolved = py_get_class_summary_result(path, name)
|
||||
if resolved.ok:
|
||||
return resolved.data
|
||||
return "; ".join(e.ui_message() for e in resolved.errors)
|
||||
|
||||
def py_get_var_declaration(path: str, name: str) -> str:
|
||||
"""Get the assignment/declaration line(s) for a module-level or class-level variable."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
return err
|
||||
assert p is not None
|
||||
if not p.is_file() or p.suffix != ".py":
|
||||
return f"ERROR: not a python file: {path}"
|
||||
try:
|
||||
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
|
||||
lines = code.splitlines(keepends=True)
|
||||
tree = ast.parse(code)
|
||||
node = _get_symbol_node(tree, name)
|
||||
if not node or not isinstance(node, (ast.Assign, ast.AnnAssign)):
|
||||
return f"ERROR: could not find variable '{name}' in {path}"
|
||||
start = cast(int, getattr(node, "lineno")) - 1
|
||||
end = cast(int, getattr(node, "end_lineno"))
|
||||
return "".join(lines[start:end])
|
||||
except Exception as e:
|
||||
return f"ERROR retrieving variable '{name}' from '{path}': {e}"
|
||||
"""Get the assignment/declaration line(s) for a module-level or class-level variable.
|
||||
|
||||
Thin wrapper over py_get_var_declaration_result; the legacy str shape is
|
||||
preserved for backward compatibility, but the try/except Exception
|
||||
lives in the Result variant.
|
||||
"""
|
||||
resolved = py_get_var_declaration_result(path, name)
|
||||
if resolved.ok:
|
||||
return resolved.data
|
||||
return "; ".join(e.ui_message() for e in resolved.errors)
|
||||
|
||||
def py_set_var_declaration(path: str, name: str, new_declaration: str) -> str:
|
||||
"""Surgically replace a variable assignment/declaration."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err:
|
||||
return err
|
||||
assert p is not None
|
||||
if not p.is_file() or p.suffix != ".py":
|
||||
return f"ERROR: not a python file: {path}"
|
||||
try:
|
||||
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
|
||||
tree = ast.parse(code)
|
||||
node = _get_symbol_node(tree, name)
|
||||
if not node or not isinstance(node, (ast.Assign, ast.AnnAssign)):
|
||||
return f"ERROR: could not find variable '{name}' in {path}"
|
||||
start = cast(int, getattr(node, "lineno"))
|
||||
end = cast(int, getattr(node, "end_lineno"))
|
||||
return set_file_slice(path, start, end, new_declaration)
|
||||
except Exception as e:
|
||||
return f"ERROR updating variable '{name}' in '{path}': {e}"
|
||||
"""Surgically replace a variable assignment/declaration.
|
||||
|
||||
Thin wrapper over py_set_var_declaration_result; the legacy str shape is
|
||||
preserved for backward compatibility, but the try/except Exception
|
||||
lives in the Result variant.
|
||||
"""
|
||||
resolved = py_set_var_declaration_result(path, name, new_declaration)
|
||||
if resolved.ok:
|
||||
return resolved.data
|
||||
return "; ".join(e.ui_message() for e in resolved.errors)
|
||||
|
||||
def py_find_usages(path: str, name: str) -> str:
|
||||
"""Finds exact string matches of a symbol in a given file or directory."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err: return err
|
||||
assert p is not None
|
||||
try:
|
||||
import re
|
||||
pattern = re.compile(r"\b" + re.escape(name) + r"\b")
|
||||
results = []
|
||||
"""Finds exact string matches of a symbol in a given file or directory.
|
||||
|
||||
def _search_file(fp: Path) -> None:
|
||||
if fp.name == "history.toml" or fp.name.endswith("_history.toml"): return
|
||||
if not _is_allowed(fp): return
|
||||
try:
|
||||
text = fp.read_text(encoding="utf-8")
|
||||
lines = text.splitlines()
|
||||
for i, line in enumerate(lines, 1):
|
||||
if pattern.search(line):
|
||||
rel = fp.relative_to(_primary_base_dir if _primary_base_dir else Path.cwd())
|
||||
results.append(f"{rel}:{i}: {line.strip()[:100]}")
|
||||
except Exception:
|
||||
pass
|
||||
if p.is_file():
|
||||
_search_file(p)
|
||||
else:
|
||||
for root, dirs, files in os.walk(p):
|
||||
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('__pycache__', 'venv', 'env')]
|
||||
for file in files:
|
||||
if file.endswith(('.py', '.md', '.toml', '.txt', '.json')):
|
||||
_search_file(Path(root) / file)
|
||||
if not results:
|
||||
return f"No usages found for '{name}' in {p}"
|
||||
if len(results) > 100:
|
||||
return "\n".join(results[:100]) + f"\n... (and {len(results)-100} more)"
|
||||
return "\n".join(results)
|
||||
except Exception as e:
|
||||
return f"ERROR finding usages for '{name}': {e}"
|
||||
Thin wrapper over py_find_usages_result; the legacy str shape is
|
||||
preserved for backward compatibility, but the try/except Exception
|
||||
lives in the Result variant.
|
||||
"""
|
||||
resolved = py_find_usages_result(path, name)
|
||||
if resolved.ok:
|
||||
return resolved.data
|
||||
return "; ".join(e.ui_message() for e in resolved.errors)
|
||||
|
||||
def py_get_imports(path: str) -> str:
|
||||
"""Parses a file's AST and returns a strict list of its dependencies."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err: return err
|
||||
assert p is not None
|
||||
if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}"
|
||||
try:
|
||||
code = p.read_text(encoding="utf-8")
|
||||
tree = ast.parse(code)
|
||||
imports = []
|
||||
for node in tree.body:
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
imports.append(alias.name)
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
module = node.module or ""
|
||||
for alias in node.names:
|
||||
imports.append(f"{module}.{alias.name}" if module else alias.name)
|
||||
if not imports: return "No imports found."
|
||||
return "Imports:\n" + "\n".join(f" - {i}" for i in imports)
|
||||
except Exception as e:
|
||||
return f"ERROR getting imports for '{path}': {e}"
|
||||
"""Parses a file's AST and returns a strict list of its dependencies.
|
||||
|
||||
Thin wrapper over py_get_imports_result; the legacy str shape is
|
||||
preserved for backward compatibility, but the try/except Exception
|
||||
lives in the Result variant.
|
||||
"""
|
||||
resolved = py_get_imports_result(path)
|
||||
if resolved.ok:
|
||||
return resolved.data
|
||||
return "; ".join(e.ui_message() for e in resolved.errors)
|
||||
|
||||
def py_check_syntax(path: str) -> str:
|
||||
"""Runs a quick syntax check on a Python file."""
|
||||
p, err = _resolve_and_check(path)
|
||||
if err: return err
|
||||
assert p is not None
|
||||
if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}"
|
||||
try:
|
||||
code = p.read_text(encoding="utf-8")
|
||||
ast.parse(code)
|
||||
return f"Syntax OK: {path}"
|
||||
except SyntaxError as e:
|
||||
return f"SyntaxError in {path} at line {e.lineno}, offset {e.offset}: {e.msg}\n{e.text}"
|
||||
except Exception as e:
|
||||
return f"ERROR checking syntax for '{path}': {e}"
|
||||
"""Runs a quick syntax check on a Python file.
|
||||
|
||||
Thin wrapper over py_check_syntax_result; the legacy str shape is
|
||||
preserved for backward compatibility, but the try/except Exception
|
||||
lives in the Result variant.
|
||||
"""
|
||||
resolved = py_check_syntax_result(path)
|
||||
if resolved.ok:
|
||||
return resolved.data
|
||||
return "; ".join(e.ui_message() for e in resolved.errors)
|
||||
|
||||
def py_get_hierarchy(path: str, class_name: str) -> str:
|
||||
"""Scans the project to find subclasses of a given class."""
|
||||
|
||||
Reference in New Issue
Block a user