Private
Public Access
0
0

refactor(src): eliminate 11 T | None legacy wrappers in favor of _result API

TIER-3 READ AGENTS.md + conductor/workflow.md + conductor/code_styleguides/error_handling.md + the 4 source files + 3 test files before this commit.

The code_path_audit_phase_2_20260624 track (Tier 2) shipped 11 audit
fixes (4 NG1 + 7 NG2) but used a heuristic bypass for 4 of the NG2
wrappers: legacy T | None functions that exist only to maintain test
patcher compatibility. Per the review at
docs/reports/REVIEW_TIER2_code_path_audit_phase_2_20260624.md Finding 8,
this track eliminates the legacy wrappers properly.

11 wrappers eliminated (8 main + 3 _legacy_compat inner):
- src/ai_client.py: get_current_tier (1 src + 1 test consumer)
- src/ai_client.py: _gemini_tool_declaration + _legacy_compat (2 test consumers)
- src/ai_client.py: run_tier4_patch_callback + _legacy_compat (was 0 direct callers
  but had 2 callback references in app_controller/multi_agent_conductor;
  callback contract migrated to Callable[[str, str], Result[str]] instead of
  preserving an Optional[str] adapter)
- src/mcp_client.py: _get_symbol_node + _legacy_compat (8 in-file consumers)
- src/mcp_client.py: find_in_scope (nested inside _get_symbol_node_result;
  private impl detail, audit doesn't catch T | None, left as-is)
- src/external_editor.py: launch_diff (1 src + 3 test + 1 live_gui test consumer)
- src/external_editor.py: launch_editor (no consumers; deleted)
- src/session_logger.py: log_tool_output (2 src + 3 test consumers)
- src/project_manager.py: parse_ts (no consumers; deleted)

For each consumer: replace legacy_fn(args) with legacy_fn_result(args).data.
For T | None checks: replace if x is None: with if not result.ok: or
if not result.ok or not isinstance(result.data, ...) (depending on pattern).

For run_tier4_patch_callback specifically: the wrapper was a callback adapter
(not a backward-compat shim) and had 2 callback references as consumers.
Rather than keep the adapter (which would re-introduce the Optional[str]
return that the strict audit catches), the patch_callback contract was migrated
from Callable[[str, str], Optional[str]] to Callable[[str, str], Result[str]]
in shell_runner.py + app_controller.py + 9 _send_<vendor>_result signatures
in ai_client.py. This propagates the Result[str] through the callback and
lets shell_runner unwrap with if r.ok and r.data instead of if patch_text.

Verification:
- audit_optional_in_3_files --strict: 0 return-type Optional[T] (down from 1)
- audit_exception_handling --strict: 0 violations (unchanged)
- audit_legacy_wrappers: 0 legacy wrappers (unchanged)
- 15 affected test files: 168 tests pass
- 8 mcp_client/structural/baseline test files: 55 tests pass
- 3 session/gui test files: 7 tests pass
- 0 return-type Optional[T] in src/ai_client.py (was 1: run_tier4_patch_callback)
This commit is contained in:
2026-06-25 11:18:03 -04:00
parent 8ec0a30bf4
commit dc397db7ed
20 changed files with 110 additions and 148 deletions
+15 -41
View File
@@ -145,7 +145,7 @@ _active_bias_profile: Optional[BiasProfile] = None
_gemini_cli_adapter: Optional[GeminiCliAdapter] = None
# Injected by gui.py - called when AI wants to run a command.
confirm_and_run_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]], Optional[Callable[[str, str], Optional[str]]]], Optional[str]]] = None
confirm_and_run_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]], Optional[Callable[[str, str], Result[str]]]], Optional[str]]] = None
# Injected by gui.py - called whenever a comms entry is appended.
# Use get_comms_log_callback/set_comms_log_callback for thread-safe access.
@@ -162,10 +162,6 @@ def get_current_tier_result() -> Result[str]:
"""Returns the current tier from thread-local storage as a Result."""
return Result(data=getattr(_local_storage, "current_tier", None))
def get_current_tier() -> str | None:
"""Backward-compat wrapper; prefer get_current_tier_result().data."""
return get_current_tier_result().data
def set_current_tier(tier: Optional[str]) -> None:
"""Sets the current tier in thread-local storage."""
_local_storage.current_tier = tier
@@ -730,19 +726,6 @@ def _gemini_tool_declaration_result() -> Result[types.Tool]:
return Result(data=None, errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message="No tool declarations to build", source="ai_client._gemini_tool_declaration_result")])
return Result(data=types.Tool(function_declarations=declarations))
def _gemini_tool_declaration_result_legacy_compat() -> types.Tool | None:
"""
LEGACY: prefer _gemini_tool_declaration_result() (returns Result[types.Tool]).
This wrapper is retained for tests that call _gemini_tool_declaration() directly.
[C: tests/test_tool_access_exclusion.py:test_gemini_tool_declaration_excludes_disabled]
"""
r = _gemini_tool_declaration_result()
return r.data if r.ok else None
def _gemini_tool_declaration() -> types.Tool | None:
"""Backward-compat alias for _gemini_tool_declaration_result_legacy_compat."""
return _gemini_tool_declaration_result_legacy_compat()
#endregion: Tool Configuration
#region: Tool Execution
@@ -771,7 +754,7 @@ async def _execute_tool_calls_concurrently(
qa_callback: Optional[Callable[[str], str]],
r_idx: int,
provider: str,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
patch_callback: Optional[Callable[[str, str], Result[str]]] = None
) -> list[tuple[str, str, str, str]]: # tool_name, call_id, output, original_name
"""
Executes tool calls concurrently using asyncio.gather.
@@ -847,7 +830,7 @@ def run_with_tool_loop(
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None,
patch_callback: Optional[Callable[[str, str], Result[str]]] = None,
base_dir: str,
vendor_name: str,
history_lock: Optional[threading.Lock] = None,
@@ -960,7 +943,7 @@ async def _execute_single_tool_call_async(
qa_callback: Optional[Callable[[str], str]],
r_idx: int,
tier: str | None = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
patch_callback: Optional[Callable[[str, str], Result[str]]] = None
) -> tuple[str, str, str, str]:
"""
Executes a single tool call asynchronously, checking the approval clutch.
@@ -1044,7 +1027,7 @@ async def _execute_single_tool_call_async(
return (name, call_id, out, name)
def _run_script(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
def _run_script(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> str:
if confirm_and_run_callback is None:
return "ERROR: no confirmation handler registered"
result = confirm_and_run_callback(script, base_dir, qa_callback, patch_callback)
@@ -1420,7 +1403,7 @@ def _send_anthropic(
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
patch_callback: Optional[Callable[[str, str], Result[str]]] = None
) -> Result[str]:
"""
Functional Purpose:
@@ -1815,7 +1798,7 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
qa_callback: Optional[Callable[[str], str]] = None,
enable_tools: bool = True,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None
patch_callback: Optional[Callable[[str, str], Result[str]]] = None
) -> Result[str]:
"""
Functional Purpose: Sends requests to Gemini via google-genai SDK, handling context caching, chat history, and tools.
@@ -2031,7 +2014,7 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Result[str]:
patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> Result[str]:
from src.openai_compatible import OpenAICompatibleRequest, NormalizedResponse
from src.openai_schemas import UsageStats
"""
@@ -2179,7 +2162,7 @@ def _send_deepseek(md_content: str, user_message: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Result[str]:
patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> Result[str]:
"""
[C: src/ai_server.py:_handle_send]
Functional Purpose: Sends requests to DeepSeek via requests.post API call, managing history repairs and tools.
@@ -2544,7 +2527,7 @@ def _send_grok(md_content: str, user_message: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Result[str]:
patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> Result[str]:
"""
Dispatches queries to Grok (x.ai) model endpoint using OpenAI compatible client.
@@ -2630,7 +2613,7 @@ def _send_minimax(md_content: str, user_message: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Result[str]:
patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> Result[str]:
"""
Dispatches queries to the MiniMax provider using OpenAI compatible client.
@@ -2787,7 +2770,7 @@ def _send_qwen(md_content: str, user_message: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Result[str]:
patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> Result[str]:
"""
Dispatches queries to Alibaba's Qwen model via DashScope SDK.
@@ -2872,7 +2855,7 @@ def _send_llama(md_content: str, user_message: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Result[str]:
patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> Result[str]:
"""
Dispatches queries to Llama-based models using OpenAI compatible client or native Ollama backend.
@@ -2972,7 +2955,7 @@ def _send_llama_native(md_content: str, user_message: str, base_dir: str,
pre_tool_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]]], Optional[str]]] = None,
qa_callback: Optional[Callable[[str], str]] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Result[str]:
patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> Result[str]:
"""
Dispatches queries natively to local Ollama endpoints using direct HTTP requests.
@@ -3123,15 +3106,6 @@ def _run_tier4_patch_callback_result(stderr: str, base_dir: str) -> Result[str]:
)
def run_tier4_patch_callback_legacy_compat(stderr: str, base_dir: str) -> str | None:
"""LEGACY: prefer _run_tier4_patch_callback_result() (returns Result[str])."""
r = _run_tier4_patch_callback_result(stderr, base_dir)
return r.data if r.ok and r.data else None
def run_tier4_patch_callback(stderr: str, base_dir: str) -> str | None:
"""Backward-compat alias for run_tier4_patch_callback_legacy_compat."""
return run_tier4_patch_callback_legacy_compat(stderr, base_dir)
def _run_tier4_patch_generation_result(error: str, file_context: str) -> Result[str]:
"""Tier 4 QA agent: generate a unified-diff patch for the given error.
@@ -3233,7 +3207,7 @@ def send(
qa_callback: Optional[Callable[[str], str]] = None,
enable_tools: bool = True,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable[[str, str], Optional[str]]] = None,
patch_callback: Optional[Callable[[str, str], Result[str]]] = None,
rag_engine: Optional[Any] = None,
) -> Result[str]:
"""
+7 -7
View File
@@ -4211,7 +4211,7 @@ class AppController:
stream_callback=lambda text: self._on_ai_stream(text),
pre_tool_callback=self._confirm_and_run,
qa_callback=ai_client.run_tier4_analysis,
patch_callback=ai_client.run_tier4_patch_callback,
patch_callback=ai_client._run_tier4_patch_callback_result,
rag_engine=None, # Already handled above
)
if result.ok:
@@ -4227,8 +4227,8 @@ class AppController:
[C: tests/test_app_controller_offloading.py:test_on_tool_log_offloading]
"""
session_logger.log_tool_call(script, result, None)
session_logger.log_tool_output(result)
source_tier = ai_client.get_current_tier()
session_logger.log_tool_output_result(result)
source_tier = ai_client.get_current_tier_result().data
with self._pending_tool_calls_lock:
self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})
@@ -4238,9 +4238,9 @@ class AppController:
payload = optimized.get("payload", {})
if kind == "tool_result" and "output" in payload:
output = payload["output"]
ref_path = session_logger.log_tool_output(output)
if ref_path:
filename = Path(ref_path).name
ref_result = session_logger.log_tool_output_result(output)
if ref_result.ok and ref_result.data:
filename = Path(ref_result.data).name
payload["output"] = f"[REF:{filename}]"
if kind == "tool_call" and "script" in payload:
script = payload["script"]
@@ -4394,7 +4394,7 @@ class AppController:
if self.ui_auto_scroll_tool_calls:
self._scroll_tool_calls_to_bottom = True
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> Optional[str]:
def _confirm_and_run(self, script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> Optional[str]:
"""
[C: tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_mutating_tool_triggers_callback, tests/test_arch_boundary_phase2.py:TestArchBoundaryPhase2.test_rejection_prevents_dispatch]
"""
+2 -17
View File
@@ -35,14 +35,10 @@ class ExternalEditorLauncher:
cmd = [editor.path] + editor.diff_args + [original_path, modified_path]
return cmd
def launch_diff(self, editor_name: Optional[str], original_path: str, modified_path: str) -> Optional[subprocess.Popen]:
def launch_diff_result(self, editor_name: Optional[str], original_path: str, modified_path: str) -> Result[subprocess.Popen]:
"""
[C: src/gui_2.py:App._open_patch_in_external_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_file_not_found, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_missing_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_success]
"""
r = self.launch_diff_result(editor_name, original_path, modified_path)
return r.data if r.ok else None
def launch_diff_result(self, editor_name: Optional[str], original_path: str, modified_path: str) -> Result[subprocess.Popen]:
editor = self.get_editor(editor_name)
if not editor:
return Result(data=None, errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"No editor configured: {editor_name}", source="external_editor.launch_diff_result")])
@@ -52,18 +48,7 @@ class ExternalEditorLauncher:
except FileNotFoundError as e:
return Result(data=None, errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"Editor binary not found: {cmd[0]}", source="external_editor.launch_diff_result", original=e)])
def launch_editor(self, editor_name: Optional[str], file_path: str) -> Optional[subprocess.Popen]:
r = self.launch_editor_result(editor_name, file_path)
return r.data if r.ok else None
def launch_editor_result(self, editor_name: Optional[str], file_path: str) -> Result[subprocess.Popen]:
editor = self.get_editor(editor_name)
if not editor:
return Result(data=None, errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"No editor configured: {editor_name}", source="external_editor.launch_editor_result")])
try:
return Result(data=subprocess.Popen([editor.path, file_path]))
except FileNotFoundError as e:
return Result(data=None, errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"Editor binary not found: {editor.path}", source="external_editor.launch_editor_result", original=e)])
_cached_vscode_config: Optional[TextEditorConfig] = None
+3 -3
View File
@@ -8065,8 +8065,8 @@ def _open_patch_in_external_editor_result(app: "App") -> Result[bool]:
source="gui_2._open_patch_in_external_editor_result",
)])
temp_path = create_temp_modified_file(app._pending_patch_text)
result = launcher.launch_diff(None, original_path, temp_path)
if result is None:
result = launcher.launch_diff_result(None, original_path, temp_path)
if not result.ok or result.data is None:
app._patch_error_message = "Failed to launch external editor"
return Result(data=False, errors=[ErrorInfo(
kind=ErrorKind.INTERNAL,
@@ -8074,7 +8074,7 @@ def _open_patch_in_external_editor_result(app: "App") -> Result[bool]:
source="gui_2._open_patch_in_external_editor_result",
)])
app._patch_error_message = None
app._vscode_diff_process = result
app._vscode_diff_process = result.data
return Result(data=True)
except Exception as e:
app._patch_error_message = str(e)
+20 -23
View File
@@ -695,9 +695,10 @@ def py_get_signature_result(path: str, name: str) -> Result[str]:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines(keepends=True)
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, (ast.FunctionDef, ast.AsyncFunctionDef)):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find function/method '{name}' in {path}", source="mcp.py_get_signature_result")])
node = node_result.data
start = cast(int, getattr(node, "lineno")) - 1
body_start = cast(int, getattr(node.body[0], "lineno")) - 1
sig = "".join(lines[start:body_start]).rstrip()
@@ -724,9 +725,10 @@ def py_set_signature_result(path: str, name: str, new_signature: str) -> Result[
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, (ast.FunctionDef, ast.AsyncFunctionDef)):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find function/method '{name}' in {path}", source="mcp.py_set_signature_result")])
node = node_result.data
start = node.lineno
body_start_line = node.body[0].lineno
end = body_start_line - 1
@@ -747,9 +749,10 @@ def py_get_class_summary_result(path: str, name: str) -> Result[str]:
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, ast.ClassDef):
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, ast.ClassDef):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find class '{name}' in {path}", source="mcp.py_get_class_summary_result")])
node = node_result.data
lines = code.splitlines(keepends=True)
summary = [f"Class: {name}"]
doc = ast.get_docstring(node)
@@ -778,9 +781,10 @@ def py_get_var_declaration_result(path: str, name: str) -> Result[str]:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
lines = code.splitlines(keepends=True)
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, (ast.Assign, ast.AnnAssign)):
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, (ast.Assign, ast.AnnAssign)):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find variable '{name}' in {path}", source="mcp.py_get_var_declaration_result")])
node = node_result.data
start = cast(int, getattr(node, "lineno")) - 1
end = cast(int, getattr(node, "end_lineno"))
return Result(data="".join(lines[start:end]))
@@ -799,9 +803,10 @@ def py_set_var_declaration_result(path: str, name: str, new_declaration: str) ->
try:
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF))
tree = ast.parse(code)
node = _get_symbol_node(tree, name)
if not node or not isinstance(node, (ast.Assign, ast.AnnAssign)):
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok or not isinstance(node_result.data, (ast.Assign, ast.AnnAssign)):
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find variable '{name}' in {path}", source="mcp.py_set_var_declaration_result")])
node = node_result.data
start = cast(int, getattr(node, "lineno"))
end = cast(int, getattr(node, "end_lineno"))
inner = set_file_slice_result(path, start, end, new_declaration)
@@ -911,9 +916,10 @@ def py_get_docstring_result(path: str, name: str) -> Result[str]:
if not name or name == "module":
doc = ast.get_docstring(tree)
return Result(data=doc if doc else "No module docstring found.")
node = _get_symbol_node(tree, name)
if not node:
node_result = _get_symbol_node_result(tree, name)
if not node_result.ok:
return Result(data="", errors=[ErrorInfo(kind=ErrorKind.NOT_FOUND, message=f"could not find symbol '{name}' in {path}", source="mcp.py_get_docstring_result")])
node = node_result.data
if isinstance(node, (ast.AsyncFunctionDef, ast.FunctionDef, ast.ClassDef, ast.Module)):
doc = ast.get_docstring(node)
return Result(data=doc if doc else f"No docstring found for '{name}'.")
@@ -939,7 +945,7 @@ def derive_code_path_result(target: str, max_depth: int = 5) -> Result[str]:
if f"def {symbol_name}" in code or f"class {symbol_name}" in code:
try:
tree = ast.parse(code)
if _get_symbol_node(tree, symbol_name):
if _get_symbol_node_result(tree, symbol_name).ok:
found_path, found_code = str(p), code
break
except (SyntaxError, ValueError) as e:
@@ -969,7 +975,7 @@ def derive_code_path_result(target: str, max_depth: int = 5) -> Result[str]:
if call in ("print", "len", "str", "int", "list", "dict", "set", "range", "enumerate", "isinstance", "getattr", "setattr", "hasattr"): continue
c_path, c_code = None, None
full_tree = ast.parse(code)
if _get_symbol_node(full_tree, call): c_path, c_code = path, code
if _get_symbol_node_result(full_tree, call).ok: c_path, c_code = path, code
else:
for r in ["src", "simulation"]:
for p in Path(r).rglob("*.py"):
@@ -1282,15 +1288,6 @@ def ts_cpp_update_definition(path: str, name: str, new_content: str) -> str:
#endregion: C++
#region: Python AST
def _get_symbol_node_legacy_compat(tree: ast.AST, name: str) -> ast.AST | None:
"""LEGACY: prefer _get_symbol_node_result() (returns Result[ast.AST])."""
r = _get_symbol_node_result(tree, name)
return r.data if r.ok else None
def _get_symbol_node(tree: ast.AST, name: str) -> ast.AST | None:
"""Backward-compat alias for _get_symbol_node_legacy_compat."""
return _get_symbol_node_legacy_compat(tree, name)
def _get_symbol_node_result(tree: ast.AST, name: str) -> Result[ast.AST]:
"""Result-returning variant of _get_symbol_node."""
+1 -1
View File
@@ -599,7 +599,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
base_dir=".",
pre_tool_callback=clutch_callback if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis,
patch_callback=ai_client.run_tier4_patch_callback,
patch_callback=ai_client._run_tier4_patch_callback_result,
stream_callback=stream_callback
)
if not result.ok:
-4
View File
@@ -40,10 +40,6 @@ TS_FMT: str = "%Y-%m-%dT%H:%M:%S"
def now_ts() -> str:
return datetime.datetime.now().strftime(TS_FMT)
def parse_ts(s: str) -> Optional[datetime.datetime]:
r = parse_ts_result(s)
return r.data if r.ok else None
def parse_ts_result(s: str) -> Result[datetime.datetime]:
try:
return Result(data=datetime.datetime.strptime(s, TS_FMT))
+1 -10
View File
@@ -12,7 +12,7 @@ logs/sessions/<session_id>/
apihooks.log - sequential record of every API hook call
clicalls.log - sequential record of every CLI subprocess call
scripts/ - subdir containing the AI-generated PowerShell scripts
outputs/ - subdir containing tool outputs saved via log_tool_output()
outputs/ - subdir containing tool outputs saved via log_tool_output_result()
scripts/generated/
<ts>_<seq:04d>.ps1 - top-level copy of every PowerShell script the AI
@@ -208,15 +208,6 @@ def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optio
return str(ps1_path) if ps1_path else None
def log_tool_output(content: str) -> Optional[str]:
"""
Save tool output content to a unique file in the session's outputs directory.
Returns the path of the written file.
[C: tests/test_session_logger_optimization.py:test_log_tool_output_returns_none_if_no_session, tests/test_session_logger_optimization.py:test_log_tool_output_saves_in_session_outputs]
"""
r = log_tool_output_result(content)
return r.data if r.ok else None
def log_tool_output_result(content: str) -> Result[str]:
global _output_seq
if _session_dir is None:
+4 -4
View File
@@ -55,7 +55,7 @@ def _build_subprocess_env() -> dict[str, str]:
env[key] = os.path.expandvars(str(val))
return env
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Result[str]]] = None) -> str:
"""
Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code.
@@ -86,9 +86,9 @@ def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[s
if qa_analysis:
parts.append(f"\nQA ANALYSIS:\n{qa_analysis}")
if patch_callback and (process.returncode != 0 or stderr.strip()):
patch_text = patch_callback(stderr.strip(), base_dir)
if patch_text:
parts.append(f"\nAUTO_PATCH:\n{patch_text}")
patch_result = patch_callback(stderr.strip(), base_dir)
if patch_result.ok and patch_result.data:
parts.append(f"\nAUTO_PATCH:\n{patch_result.data}")
return "\n".join(parts)
except subprocess.TimeoutExpired:
if 'process' in locals() and process:
+1 -1
View File
@@ -7,7 +7,7 @@ def test_ai_client_tier_isolation():
def intercepted_append(direction, kind, payload):
captured_logs.append({
'thread_name': threading.current_thread().name,
'source_tier': ai_client.get_current_tier()
'source_tier': ai_client.get_current_tier_result().data
})
original_append(direction, kind, payload)
ai_client._append_comms = intercepted_append
+3 -3
View File
@@ -64,7 +64,7 @@ def test_fr1_error_becomes_discussion_entry(mock_app: App, monkeypatch: pytest.M
monkeypatch.setattr(ai_client, "set_agent_tools", lambda *a, **kw: None)
monkeypatch.setattr(ai_client, "set_current_tier", lambda *a, **kw: None)
monkeypatch.setattr(ai_client, "get_combined_system_prompt", lambda *a, **kw: "")
monkeypatch.setattr(ai_client, "get_current_tier", lambda *a, **kw: None)
monkeypatch.setattr(ai_client, "get_current_tier_result", lambda *a, **kw: Result(data=None))
monkeypatch.setattr("src.app_controller.AppController._update_gcli_adapter", lambda *a, **kw: None)
_drain_queue(app)
app.controller._handle_request_event(_make_event())
@@ -93,7 +93,7 @@ def test_fr1_success_still_works(mock_app: App, monkeypatch: pytest.MonkeyPatch)
monkeypatch.setattr(ai_client, "set_agent_tools", lambda *a, **kw: None)
monkeypatch.setattr(ai_client, "set_current_tier", lambda *a, **kw: None)
monkeypatch.setattr(ai_client, "get_combined_system_prompt", lambda *a, **kw: "")
monkeypatch.setattr(ai_client, "get_current_tier", lambda *a, **kw: None)
monkeypatch.setattr(ai_client, "get_current_tier_result", lambda *a, **kw: Result(data=None))
monkeypatch.setattr("src.app_controller.AppController._update_gcli_adapter", lambda *a, **kw: None)
_drain_queue(app)
app.controller._handle_request_event(_make_event())
@@ -121,7 +121,7 @@ def test_fr1_ai_status_updated(mock_app: App, monkeypatch: pytest.MonkeyPatch) -
monkeypatch.setattr(ai_client, "set_agent_tools", lambda *a, **kw: None)
monkeypatch.setattr(ai_client, "set_current_tier", lambda *a, **kw: None)
monkeypatch.setattr(ai_client, "get_combined_system_prompt", lambda *a, **kw: "")
monkeypatch.setattr(ai_client, "get_current_tier", lambda *a, **kw: None)
monkeypatch.setattr(ai_client, "get_current_tier_result", lambda *a, **kw: Result(data=None))
monkeypatch.setattr("src.app_controller.AppController._update_gcli_adapter", lambda *a, **kw: None)
_drain_queue(app)
app.controller._handle_request_event(_make_event())
+1 -1
View File
@@ -96,7 +96,7 @@ def test_on_tool_log_offloading(app_controller, tmp_session_dir):
script = "Get-Process"
result = "Process list..."
with patch("src.ai_client.get_current_tier", return_value="Tier 3"):
with patch("src.ai_client.get_current_tier_result", return_value=Result(data="Tier 3")):
app_controller._on_tool_log(script, result)
# Verify files were created in session directory
+9 -6
View File
@@ -101,21 +101,24 @@ class TestExternalEditorLauncher:
assert cmd == ["C:\\path\\to\\code.exe", "--diff", "orig.txt", "mod.txt"]
def test_launch_diff_missing_editor(self, launcher):
result = launcher.launch_diff("nonexistent", "orig.txt", "mod.txt")
assert result is None
result = launcher.launch_diff_result("nonexistent", "orig.txt", "mod.txt")
assert not result.ok
assert result.data is None
@patch("subprocess.Popen")
def test_launch_diff_success(self, mock_popen, launcher):
mock_popen.return_value = MagicMock()
result = launcher.launch_diff("vscode", "orig.txt", "mod.txt")
assert result is not None
result = launcher.launch_diff_result("vscode", "orig.txt", "mod.txt")
assert result.ok
assert result.data is not None
mock_popen.assert_called_once()
@patch("subprocess.Popen")
def test_launch_diff_file_not_found(self, mock_popen, launcher):
mock_popen.side_effect = FileNotFoundError()
result = launcher.launch_diff("vscode", "orig.txt", "mod.txt")
assert result is None
result = launcher.launch_diff_result("vscode", "orig.txt", "mod.txt")
assert not result.ok
assert result.data is None
class TestHelperFunctions:
+2 -2
View File
@@ -1033,7 +1033,7 @@ def test_phase_5_l1393_open_patch_in_external_editor_result_success():
L1393 _open_patch_in_external_editor_result returns Result.ok=True on success.
The helper wraps the external editor launch try/except in
App._open_patch_in_external_editor. On success (launcher.launch_diff
App._open_patch_in_external_editor. On success (launcher.launch_diff_result
returns a process), returns Result(data=True).
"""
from src import gui_2
@@ -1045,7 +1045,7 @@ def test_phase_5_l1393_open_patch_in_external_editor_result_success():
mock_launcher = MagicMock(name="mock_launcher")
mock_launcher.config.get_default.return_value = mock_editor
mock_process = MagicMock(name="mock_process")
mock_launcher.launch_diff.return_value = mock_process
mock_launcher.launch_diff_result.return_value = MagicMock(ok=True, data=mock_process)
with patch("os.path.exists", return_value=True), \
patch("src.external_editor.get_default_launcher", return_value=mock_launcher), \
patch("src.external_editor.create_temp_modified_file", return_value="/tmp/patch_temp.py"):
+1 -1
View File
@@ -67,7 +67,7 @@ async def test_headless_verification_error_and_qa_interceptor(vlogger) -> None:
patch("src.ai_client.confirm_and_run_callback") as mock_run, \
patch("src.ai_client.run_tier4_analysis", return_value="FIX: Check if path exists.") as mock_qa, \
patch("src.ai_client._ensure_gemini_client") as mock_ensure, \
patch("src.ai_client._gemini_tool_declaration", return_value=None), \
patch("src.ai_client._gemini_tool_declaration_result", return_value=Result(data=None)), \
patch("src.multi_agent_conductor.confirm_spawn", return_value=(True, "mock_prompt", "mock_ctx")):
# Ensure _gemini_client is restored by the mock ensure function
+3 -3
View File
@@ -12,9 +12,9 @@ def reset_tier():
ai_client.set_current_tier(None)
def test_get_current_tier_exists() -> None:
"""ai_client must expose a get_current_tier function."""
assert hasattr(ai_client, "get_current_tier")
assert callable(ai_client.get_current_tier)
"""ai_client must expose a get_current_tier_result function."""
assert hasattr(ai_client, "get_current_tier_result")
assert callable(ai_client.get_current_tier_result)
def test_append_comms_has_source_tier_key() -> None:
"""Dict entries in comms log must have a 'source_tier' key."""
+13 -10
View File
@@ -78,20 +78,23 @@ def test_log_tool_output_saves_in_session_outputs(temp_session_setup: tuple[Path
output_content = "This is some tool output content."
# Call log_tool_output
output_path_str = session_logger.log_tool_output(output_content)
assert output_path_str is not None
output_path = Path(output_path_str)
output_result = session_logger.log_tool_output_result(output_content)
assert output_result.ok, f"log_tool_output failed: {output_result.errors}"
assert output_result.data is not None
output_path = Path(output_result.data)
assert output_path.parent == outputs_subdir
assert output_path.name == "output_0001.txt"
assert output_path.read_text(encoding="utf-8") == output_content
# Verify second call increments sequence
output_path_str_2 = session_logger.log_tool_output("More content")
assert output_path_str_2 is not None
assert Path(output_path_str_2).name == "output_0002.txt"
output_result_2 = session_logger.log_tool_output_result("More content")
assert output_result_2.ok, f"log_tool_output failed: {output_result_2.errors}"
assert output_result_2.data is not None
assert Path(output_result_2.data).name == "output_0002.txt"
def test_log_tool_output_returns_none_if_no_session(temp_session_setup: tuple[Path, Path]) -> None:
# We don't call open_session here
output_path_str = session_logger.log_tool_output("Should not save")
assert output_path_str is None
output_result = session_logger.log_tool_output_result("Should not save")
assert not output_result.ok
assert output_result.data is None
+2 -2
View File
@@ -14,7 +14,7 @@ def test_set_agent_tools_clears_caches():
def test_gemini_tool_declaration_excludes_disabled():
# Test explicit disable
ai_client.set_agent_tools({"read_file": False})
tool = ai_client._gemini_tool_declaration()
tool = ai_client._gemini_tool_declaration_result().data
names = [f.name for f in tool.function_declarations] if tool else []
assert "read_file" not in names
@@ -23,7 +23,7 @@ def test_gemini_tool_declaration_excludes_disabled():
all_tools[ai_client.TOOL_NAME] = False
all_tools["read_file"] = True
ai_client.set_agent_tools(all_tools)
tool = ai_client._gemini_tool_declaration()
tool = ai_client._gemini_tool_declaration_result().data
names = [f.name for f in tool.function_declarations] if tool else []
assert "read_file" in names
assert "write_file" not in names
+4 -4
View File
@@ -48,22 +48,22 @@ def test_phase10_all_helpers_exist():
def test_phase10_legacy_functions_preserved():
"""Legacy functions preserved EXCEPT those OBLITERATED by cruft-removal Phase 4."""
"""Legacy functions preserved EXCEPT those OBLITERATED by cruft-removal Phase 4 or code_path_audit_phase_2 cleanup."""
import src.ai_client
legacy = [
"_send_gemini",
"_send_gemini_cli",
"run_tier4_analysis",
"run_tier4_patch_callback",
"run_tier4_patch_generation",
]
# _list_gemini_models wrapper was OBLITERATED by cruft-removal Phase 4
obliterated = ["_list_gemini_models"]
# run_tier4_patch_callback wrapper was OBLITERATED by code_path_audit_phase_2 cleanup
obliterated = ["_list_gemini_models", "run_tier4_patch_callback"]
for name in legacy:
assert hasattr(src.ai_client, name), f"{name} legacy function missing"
assert callable(getattr(src.ai_client, name)), f"{name} not callable"
for name in obliterated:
assert not hasattr(src.ai_client, name), (
f"{name} wrapper must be OBLITERATED (cruft-removal Phase 4); "
f"{name} wrapper must be OBLITERATED; "
f"callers must use {name}_result directly"
)
+18 -5
View File
@@ -45,10 +45,23 @@ def test_phase10_sites789_all_helpers_return_result():
def test_phase10_sites789_legacy_unchanged():
"""Legacy functions must still exist + be callable."""
"""Legacy functions preserved EXCEPT those OBLITERATED by code_path_audit_phase_2 cleanup.
run_tier4_patch_callback was a T|None wrapper (heuristic bypass per review Finding 8)
whose only consumers were callback references in app_controller.py and
multi_agent_conductor.py. After this cleanup track:
- The callback contract migrated to Callable[[str, str], Result[str]]
- The 2 callers now pass _run_tier4_patch_callback_result directly
- run_tier4_patch_callback wrapper is gone
"""
import src.ai_client
for name in ("run_tier4_analysis",
"run_tier4_patch_callback",
"run_tier4_patch_generation"):
legacy = ["run_tier4_analysis", "run_tier4_patch_generation"]
obliterated = ["run_tier4_patch_callback"]
for name in legacy:
assert hasattr(src.ai_client, name), f"{name} missing"
assert callable(getattr(src.ai_client, name)), f"{name} not callable"
assert callable(getattr(src.ai_client, name)), f"{name} not callable"
for name in obliterated:
assert not hasattr(src.ai_client, name), (
f"{name} wrapper must be OBLITERATED (code_path_audit_phase_2 cleanup); "
f"callers must use {name}_result directly"
)