diff --git a/src/mcp_client.py b/src/mcp_client.py index 27f09aea..d93b7f61 100644 --- a/src/mcp_client.py +++ b/src/mcp_client.py @@ -412,6 +412,155 @@ def set_file_slice_result(path: str, start_line: int, end_line: int, new_content return Result(data=f"Successfully updated lines {start_line}-{end_line} in {path}") except Exception as e: return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.set_file_slice_result", original=e)]) + + +def get_git_diff_result(path: str, base_rev: str = "HEAD", head_rev: str = "") -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + cmd = ["git", "diff", base_rev] + if head_rev: + cmd.append(head_rev) + cmd.extend(["--", str(p)]) + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding="utf-8") + return Result(data=result.stdout if result.stdout else "(no changes)") + except subprocess.CalledProcessError as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=f"git diff failed: {e.stderr}", source="mcp.get_git_diff_result")]) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.get_git_diff_result", original=e)]) + +def _ast_get_skeleton(code: str, lang: str, path_str: str) -> str: + from src.file_cache import ASTParser + return ASTParser(lang).get_skeleton(code, path=path_str) + +def _ast_get_code_outline(code: str, lang: str, path_str: str) -> str: + from src.file_cache import ASTParser + return ASTParser(lang).get_code_outline(code, path=path_str) + +def _ast_get_definition(code: str, lang: str, name: str, path_str: str) -> str: + from src.file_cache import ASTParser + return ASTParser(lang).get_definition(code, name, path=path_str) + +def _ast_get_signature(code: str, lang: str, name: str, path_str: str) -> str: + from src.file_cache import ASTParser + return ASTParser(lang).get_signature(code, name, path=path_str) + +def _ast_update_definition(code: str, lang: str, name: str, new_content: str, path_str: str) -> str: + from src.file_cache import ASTParser + return ASTParser(lang).update_definition(code, name, new_content, path=path_str) + +def ts_c_get_skeleton_result(path: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.exists(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_get_skeleton_result")]) + try: + code = p.read_text(encoding="utf-8") + return Result(data=_ast_get_skeleton(code, "c", str(p))) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_get_skeleton_result", original=e)]) + +def ts_c_get_code_outline_result(path: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.exists(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_get_code_outline_result")]) + try: + code = p.read_text(encoding="utf-8") + return Result(data=_ast_get_code_outline(code, "c", str(p))) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_get_code_outline_result", original=e)]) + +def ts_c_get_definition_result(path: str, name: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.exists(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_get_definition_result")]) + try: + code = p.read_text(encoding="utf-8") + return Result(data=_ast_get_definition(code, "c", name, str(p))) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_get_definition_result", original=e)]) + +def ts_c_get_signature_result(path: str, name: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.exists(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_get_signature_result")]) + try: + code = p.read_text(encoding="utf-8") + return Result(data=_ast_get_signature(code, "c", name, str(p))) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_get_signature_result", original=e)]) + +def ts_c_update_definition_result(path: str, name: str, new_content: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.exists(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_c_update_definition_result")]) + try: + code = p.read_text(encoding="utf-8") + updated_code = _ast_update_definition(code, "c", name, new_content, str(p)) + if updated_code.startswith("ERROR:"): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=updated_code, source="mcp.ts_c_update_definition_result")]) + p.write_text(updated_code, encoding="utf-8") + return Result(data=f"Successfully updated definition '{name}' in {path}") + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_c_update_definition_result", original=e)]) + +def ts_cpp_get_skeleton_result(path: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.exists(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_cpp_get_skeleton_result")]) + try: + code = p.read_text(encoding="utf-8") + return Result(data=_ast_get_skeleton(code, "cpp", str(p))) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_cpp_get_skeleton_result", original=e)]) + +def ts_cpp_get_code_outline_result(path: str) -> Result[str]: + resolved = _resolve_and_check_result(path) + if not resolved.ok: + return Result(data="", errors=resolved.errors) + p = resolved.data + if isinstance(p, NilPath): + return Result(data="", errors=resolved.errors) + if not p.exists(): + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"file not found: {path}", source="mcp.ts_cpp_get_code_outline_result")]) + try: + code = p.read_text(encoding="utf-8") + return Result(data=_ast_get_code_outline(code, "cpp", str(p))) + except Exception as e: + return Result(data="", errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="mcp.ts_cpp_get_code_outline_result", original=e)]) #endregion: Result Variants def edit_file(path: str, old_string: str, new_string: str, replace_all: bool = False) -> str: @@ -472,22 +621,15 @@ def get_git_diff(path: str, base_rev: str = "HEAD", head_rev: str = "") -> str: Returns the git diff for a file or directory. base_rev: The base revision (default: HEAD) head_rev: The head revision (optional) + + Thin wrapper over get_git_diff_result; the legacy str shape is + preserved for backward compatibility, but the try/except Exception + lives in the Result variant. """ - p, err = _resolve_and_check(path) - if err: - return err - assert p is not None - cmd = ["git", "diff", base_rev] - if head_rev: - cmd.append(head_rev) - cmd.extend(["--", str(p)]) - try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding="utf-8") - return result.stdout if result.stdout else "(no changes)" - except subprocess.CalledProcessError as e: - return f"ERROR running git diff: {e.stderr}" - except Exception as e: - return f"ERROR: {e}" + resolved = get_git_diff_result(path, base_rev, head_rev) + if resolved.ok: + return resolved.data + return "; ".join(e.ui_message() for e in resolved.errors) #region: C @@ -495,81 +637,65 @@ def ts_c_get_skeleton(path: str) -> str: """ Returns a skeleton of a C file. [C: tests/test_ts_c_tools.py:test_ts_c_get_skeleton] + + Thin wrapper over ts_c_get_skeleton_result; the legacy str shape is + preserved for backward compatibility, but the try/except Exception + lives in the Result variant. """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - return parser.get_skeleton(code, path=str(p)) - except Exception as e: - return f"ERROR generating skeleton for '{path}': {e}" + resolved = ts_c_get_skeleton_result(path) + if resolved.ok: + return resolved.data + return "; ".join(e.ui_message() for e in resolved.errors) def ts_c_get_code_outline(path: str) -> str: """ Returns a hierarchical outline of a C file. [C: tests/test_ts_c_tools.py:test_ts_c_get_code_outline] + + Thin wrapper over ts_c_get_code_outline_result; the legacy str shape is + preserved for backward compatibility, but the try/except Exception + lives in the Result variant. """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - return parser.get_code_outline(code, path=str(p)) - except Exception as e: - return f"ERROR generating outline for '{path}': {e}" + resolved = ts_c_get_code_outline_result(path) + if resolved.ok: + return resolved.data + return "; ".join(e.ui_message() for e in resolved.errors) def ts_c_get_definition(path: str, name: str) -> str: - """Returns the source code for a specific definition in a C file.""" - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - return parser.get_definition(code, name, path=str(p)) - except Exception as e: - return f"ERROR retrieving definition '{name}' from '{path}': {e}" + """Returns the source code for a specific definition in a C file. + + Thin wrapper over ts_c_get_definition_result; the legacy str shape is + preserved for backward compatibility, but the try/except Exception + lives in the Result variant. + """ + resolved = ts_c_get_definition_result(path, name) + if resolved.ok: + return resolved.data + return "; ".join(e.ui_message() for e in resolved.errors) def ts_c_get_signature(path: str, name: str) -> str: - """Returns the signature part of a function in a C file.""" - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - return parser.get_signature(code, name, path=str(p)) - except Exception as e: - return f"ERROR retrieving signature '{name}' from '{path}': {e}" + """Returns the signature part of a function in a C file. + + Thin wrapper over ts_c_get_signature_result; the legacy str shape is + preserved for backward compatibility, but the try/except Exception + lives in the Result variant. + """ + resolved = ts_c_get_signature_result(path, name) + if resolved.ok: + return resolved.data + return "; ".join(e.ui_message() for e in resolved.errors) def ts_c_update_definition(path: str, name: str, new_content: str) -> str: - """Surgically replace the definition of a function in a C file.""" - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("c") - updated_code = parser.update_definition(code, name, new_content, path=str(p)) - if updated_code.startswith("ERROR:"): - return updated_code - p.write_text(updated_code, encoding="utf-8") - return f"Successfully updated definition '{name}' in {path}" - except Exception as e: - return f"ERROR updating definition '{name}' in '{path}': {e}" + """Surgically replace the definition of a function in a C file. + + Thin wrapper over ts_c_update_definition_result; the legacy str shape is + preserved for backward compatibility, but the try/except Exception + lives in the Result variant. + """ + resolved = ts_c_update_definition_result(path, name, new_content) + if resolved.ok: + return resolved.data + return "; ".join(e.ui_message() for e in resolved.errors) #endregion: C @@ -579,35 +705,29 @@ def ts_cpp_get_skeleton(path: str) -> str: """ Returns a skeleton of a C++ file. [C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_skeleton] + + Thin wrapper over ts_cpp_get_skeleton_result; the legacy str shape is + preserved for backward compatibility, but the try/except Exception + lives in the Result variant. """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("cpp") - return parser.get_skeleton(code, path=str(p)) - except Exception as e: - return f"ERROR generating skeleton for '{path}': {e}" + resolved = ts_cpp_get_skeleton_result(path) + if resolved.ok: + return resolved.data + return "; ".join(e.ui_message() for e in resolved.errors) def ts_cpp_get_code_outline(path: str) -> str: """ Returns a hierarchical outline of a C++ file. [C: tests/test_gencpp_full_suite.py:test_gencpp_full_suite, tests/test_ts_cpp_tools.py:test_exhaustive_cpp_samples, tests/test_ts_cpp_tools.py:test_exhaustive_gencpp_samples, tests/test_ts_cpp_tools.py:test_ts_cpp_get_code_outline] + + Thin wrapper over ts_cpp_get_code_outline_result; the legacy str shape is + preserved for backward compatibility, but the try/except Exception + lives in the Result variant. """ - p, err = _resolve_and_check(path) - if err: return err - assert p is not None - if not p.exists(): return f"ERROR: file not found: {path}" - try: - from src.file_cache import ASTParser - code = p.read_text(encoding="utf-8") - parser = ASTParser("cpp") - return parser.get_code_outline(code, path=str(p)) - except Exception as e: - return f"ERROR generating outline for '{path}': {e}" + resolved = ts_cpp_get_code_outline_result(path) + if resolved.ok: + return resolved.data + return "; ".join(e.ui_message() for e in resolved.errors) def ts_cpp_get_definition(path: str, name: str) -> str: """ diff --git a/tests/test_baseline_result.py b/tests/test_baseline_result.py index e3f70c90..b4db68a5 100644 --- a/tests/test_baseline_result.py +++ b/tests/test_baseline_result.py @@ -119,21 +119,27 @@ def test_phase2_per_file_baseline_counts_match_inventory(): # ============ Phase 3 tests (3) ============ def test_phase3_mcp_client_broad_catch_decreased_from_40_to_32(): + """Phase 3 Batch A migrated 8 INTERNAL_BROAD_CATCH sites. + This test asserts the snapshot AFTER Phase 3 (BC=32). + Subsequent phases will loosen this test (it documents the Phase 3 boundary). + """ data = _audit_live() files = {f["filename"]: f for f in data["files"]} findings = files["src\\mcp_client.py"]["findings"] bc = sum(1 for f in findings if f["category"] == "INTERNAL_BROAD_CATCH") - assert bc == 32, f"expected mcp_client BC=32 after Batch A, got {bc}" + assert bc <= 32, f"expected mcp_client BC<=32 after Phase 3, got {bc}" def test_phase3_total_migration_target_decreased_to_80(): + """Total MIG was 88; after Phase 3 it's <=80 (Batch A migrated 8). + Subsequent phases will loosen this test.""" data = _audit_live() files = {f["filename"]: f for f in data["files"]} total = 0 for key in TARGETS: findings = files[key]["findings"] total += sum(1 for f in findings if f["category"] in MIG) - assert total == 80, f"expected total MIG=80 after Phase 3, got {total}" + assert total <= 80, f"expected total MIG<=80 after Phase 3, got {total}" def test_phase3_audit_baseline_matches_phase1_audit_json(): @@ -143,4 +149,39 @@ def test_phase3_audit_baseline_matches_phase1_audit_json(): for key in TARGETS: findings = files[key]["findings"] total += sum(1 for f in findings if f["category"] in MIG) - assert total == 88, f"PHASE1_AUDIT_BASELINE.json expected 88 baseline MIG, got {total}" \ No newline at end of file + assert total == 88, f"PHASE1_AUDIT_BASELINE.json expected 88 baseline MIG, got {total}" + + +# ============ Phase 4 tests (3) ============ + +def test_phase4_mcp_client_broad_catch_decreased_to_24(): + """Phase 4 Batch B migrated 8 more BC sites (32 -> 24).""" + data = _audit_live() + files = {f["filename"]: f for f in data["files"]} + findings = files["src\\mcp_client.py"]["findings"] + bc = sum(1 for f in findings if f["category"] == "INTERNAL_BROAD_CATCH") + assert bc == 24, f"expected mcp_client BC=24 after Phase 4, got {bc}" + + +def test_phase4_total_migration_target_decreased_to_72(): + """Total MIG was 88; after Phase 4 Batch B it's 88 - 16 = 72.""" + data = _audit_live() + files = {f["filename"]: f for f in data["files"]} + total = 0 + for key in TARGETS: + findings = files[key]["findings"] + total += sum(1 for f in findings if f["category"] in MIG) + assert total == 72, f"expected total MIG=72 after Phase 4, got {total}" + + +def test_phase4_modules_import_cleanly(): + """Verify mcp_client module imports without errors after the 8 new _result variants.""" + import src.mcp_client + assert hasattr(src.mcp_client, "get_git_diff_result") + assert hasattr(src.mcp_client, "ts_c_get_skeleton_result") + assert hasattr(src.mcp_client, "ts_c_get_code_outline_result") + assert hasattr(src.mcp_client, "ts_c_get_definition_result") + assert hasattr(src.mcp_client, "ts_c_get_signature_result") + assert hasattr(src.mcp_client, "ts_c_update_definition_result") + assert hasattr(src.mcp_client, "ts_cpp_get_skeleton_result") + assert hasattr(src.mcp_client, "ts_cpp_get_code_outline_result") \ No newline at end of file