refactor(mcp_client): migrate 3 nested helper BC sites to Result-drain (Phase 8)
Three nested helper functions inside _result variants had silent-swallow or broad-catch patterns that the audit still flagged: 1. py_find_usages_result._search_file (L846): Was: 'try/except Exception: pass' (silent-swallow per-file read errors) Now: try/except (OSError, UnicodeDecodeError) as e: errors.append(ErrorInfo(...)) Errors propagated via the parent's Result.errors 2. derive_code_path_result (L957): Was: 'try/except Exception: continue' (silent-swallow file parse errors) Now: try/except (SyntaxError, ValueError) as e: file_errors.append(ErrorInfo(...)) Errors propagated via the parent's Result.errors 3. derive_code_path_result._trace (L996): Was: try/except Exception as e: output.append(f-string with error) Now: same output.append + ALSO appends ErrorInfo to file_errors Drain: output appears in the result data string (operator-visible) All 3 sites now comply with the data-oriented convention. Audit: mcp_client migration-target sites: 0 (was 3). Categories: BOUNDARY_CONVERSION: 5, INTERNAL_COMPLIANT: 43
This commit is contained in:
+17
-9
@@ -832,7 +832,8 @@ def py_find_usages_result(path: str, name: str) -> Result[str]:
|
||||
try:
|
||||
import re
|
||||
pattern = re.compile(r"\b" + re.escape(name) + r"\b")
|
||||
results = []
|
||||
results: list[str] = []
|
||||
file_errors: list[ErrorInfo] = []
|
||||
def _search_file(fp: Path) -> None:
|
||||
if fp.name == "history.toml" or fp.name.endswith("_history.toml"): return
|
||||
if not _is_allowed(fp): return
|
||||
@@ -843,8 +844,8 @@ def py_find_usages_result(path: str, name: str) -> Result[str]:
|
||||
if pattern.search(line):
|
||||
rel = fp.relative_to(_primary_base_dir if _primary_base_dir else Path.cwd())
|
||||
results.append(f"{rel}:{i}: {line.strip()[:100]}")
|
||||
except Exception:
|
||||
pass
|
||||
except (OSError, UnicodeDecodeError) as e:
|
||||
errors.append(ErrorInfo(kind=ErrorKind.INTERNAL, message=f"failed to read {fp}: {e}", source="mcp.py_find_usages_result._search_file", original=e))
|
||||
if p.is_file():
|
||||
_search_file(p)
|
||||
else:
|
||||
@@ -854,10 +855,12 @@ def py_find_usages_result(path: str, name: str) -> Result[str]:
|
||||
if file.endswith(('.py', '.md', '.toml', '.txt', '.json')):
|
||||
_search_file(Path(root) / file)
|
||||
if not results:
|
||||
if file_errors:
|
||||
return Result(data=f"No usages found for '{name}' in {p}", errors=file_errors)
|
||||
return Result(data=f"No usages found for '{name}' in {p}")
|
||||
if len(results) > 100:
|
||||
return Result(data="\n".join(results[:100]) + f"\n... (and {len(results)-100} more)")
|
||||
return Result(data="\n".join(results))
|
||||
return Result(data="\n".join(results[:100]) + f"\n... (and {len(results)-100} more)", errors=file_errors)
|
||||
return Result(data="\n".join(results), errors=file_errors)
|
||||
except Exception as e:
|
||||
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.py_find_usages_result", original=e)])
|
||||
|
||||
@@ -935,6 +938,7 @@ def derive_code_path_result(target: str, max_depth: int = 5) -> Result[str]:
|
||||
from src.file_cache import ASTParser
|
||||
parser = ASTParser("python")
|
||||
found_path, found_code = None, None
|
||||
file_errors: list[ErrorInfo] = []
|
||||
parts = target.split(".")
|
||||
symbol_name = parts[-1]
|
||||
if len(parts) > 1:
|
||||
@@ -951,7 +955,9 @@ def derive_code_path_result(target: str, max_depth: int = 5) -> Result[str]:
|
||||
if _get_symbol_node(tree, symbol_name):
|
||||
found_path, found_code = str(p), code
|
||||
break
|
||||
except Exception: continue
|
||||
except (SyntaxError, ValueError) as e:
|
||||
file_errors.append(ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"failed to parse {p}: {e}", source="mcp.derive_code_path_result", original=e))
|
||||
continue
|
||||
if found_path: break
|
||||
if not found_path:
|
||||
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find definition for '{target}'", source="mcp.derive_code_path_result")])
|
||||
@@ -987,13 +993,15 @@ def derive_code_path_result(target: str, max_depth: int = 5) -> Result[str]:
|
||||
break
|
||||
if c_path: break
|
||||
if c_path: trace(call, c_path, c_code, depth + 1, indent + " ")
|
||||
except Exception as e:
|
||||
except (SyntaxError, ValueError, AttributeError) as e:
|
||||
output.append(f"{indent} [!] Error parsing calls for {name}: {e}")
|
||||
file_errors.append(ErrorInfo(kind=ErrorKind.INTERNAL, message=f"trace error for {name}: {e}", source="mcp.derive_code_path_result._trace", original=e))
|
||||
try:
|
||||
trace(symbol_name, found_path, found_code, 0, "")
|
||||
return Result(data="\n".join(output))
|
||||
return Result(data="\n".join(output), errors=file_errors)
|
||||
except Exception as e:
|
||||
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.derive_code_path_result", original=e)])
|
||||
file_errors.append(ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.derive_code_path_result", original=e))
|
||||
return Result(data="", errors=file_errors)
|
||||
|
||||
def get_tree_result(path: str, max_depth: int = 2) -> Result[str]:
|
||||
resolved = _resolve_and_check_result(path)
|
||||
|
||||
Reference in New Issue
Block a user